Skip to content

Commit

Permalink
ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror
Browse files Browse the repository at this point in the history
- activemq.notifications are being transferred to the target node, unless an ignore is setup
- topics are being duplicated after redistribution
- topics sends are being duplicated when a 2 node cluster mirrors to another 2 node cluster, and both nodes are mirrored.
  • Loading branch information
clebertsuconic authored and jbertram committed Apr 21, 2023
1 parent 4e77a34 commit 2a81a0a
Show file tree
Hide file tree
Showing 16 changed files with 733 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
Expand Down Expand Up @@ -76,14 +77,20 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id");
public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst");

/** When a clustered node (from regular cluster connections) receives a message
it will have target queues associated with it
this could be from message redistribution or simply load balancing.
an that case this will have the queue associated with it */
public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-trg-q");

// Capabilities
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");

public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(INTERNAL_ID.toString());
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(BROKER_ID.toString());

private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorDisabled(true));
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorOption(MirrorOption.disabled));

final Queue snfQueue;
final ActiveMQServer server;
Expand Down Expand Up @@ -228,13 +235,13 @@ private boolean sameNode(String remoteID, String sourceID) {
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
SimpleString address = context.getAddress(message);

if (invalidTarget(context.getMirrorSource())) {
logger.trace("sendMessage::server {} is discarding send to avoid infinite loop (reflection with the mirror)", server);
if (context.isInternal()) {
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
return;
}

if (context.isInternal()) {
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
if (invalidTarget(context.getMirrorSource())) {
logger.trace("sendMessage::server {} is discarding send to avoid infinite loop (reflection with the mirror)", server);
return;
}

Expand All @@ -256,7 +263,7 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
}

MessageReference ref = MessageReference.Factory.createReference(message, snfQueue);
setProtocolData(ref, nodeID, idSupplier.getID(ref));
setProtocolData(ref, nodeID, idSupplier.getID(ref), context);

snfQueue.refUp(ref);

Expand Down Expand Up @@ -330,12 +337,12 @@ private static String setProtocolData(ReferenceNodeStore referenceIDSupplier, Me
String brokerID = referenceIDSupplier.getServerID(ref);
long id = referenceIDSupplier.getID(ref);

setProtocolData(ref, brokerID, id);
setProtocolData(ref, brokerID, id, null);

return brokerID;
}

private static void setProtocolData(MessageReference ref, String brokerID, long id) {
private static void setProtocolData(MessageReference ref, String brokerID, long id, RoutingContext routingContext) {
Map<Symbol, Object> daMap = new HashMap<>();
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);

Expand All @@ -357,6 +364,13 @@ private static void setProtocolData(MessageReference ref, String brokerID, long
daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
}
}

if (routingContext != null && routingContext.isMirrorIndividualRoute()) {
ArrayList<String> queues = new ArrayList<>();
routingContext.forEachDurable(q -> queues.add(String.valueOf(q.getName())));
daMap.put(TARGET_QUEUES, queues);
}

ref.setProtocolData(DeliveryAnnotations.class, deliveryAnnotations);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;

import java.util.Collection;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;

Expand All @@ -29,6 +30,8 @@
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
Expand Down Expand Up @@ -73,6 +76,7 @@
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES;

public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {

Expand Down Expand Up @@ -436,6 +440,8 @@ private boolean sendMessage(AMQPMessage message, ACKMessageOperation messageComp
Long internalIDLong = (Long) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID);
String internalAddress = (String) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_DESTINATION);

Collection<String> targetQueues = (Collection) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, TARGET_QUEUES);

long internalID = 0;

if (internalIDLong != null) {
Expand Down Expand Up @@ -479,8 +485,13 @@ private boolean sendMessage(AMQPMessage message, ACKMessageOperation messageComp
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);

routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.OFF);
server.getPostOffice().route(message, routingContext, false);
routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.LOCAL_ONLY);
if (targetQueues != null) {
targetQueuesRouting(message, routingContext, targetQueues);
server.getPostOffice().processRoute(message, routingContext, false);
} else {
server.getPostOffice().route(message, routingContext, false);
}
// We use this as part of a transaction because of the duplicate detection cache that needs to be done atomically
transaction.commit();
flow();
Expand All @@ -489,6 +500,24 @@ private boolean sendMessage(AMQPMessage message, ACKMessageOperation messageComp
return true;
}

/** When the source mirror receives messages from a cluster member of his own, it should then fill targetQueues so we could play the same semantic the source applied on its routing */
private void targetQueuesRouting( final Message message,
final RoutingContext context,
final Collection<String> queueNames) throws Exception {
Bindings bindings = server.getPostOffice().getBindingsForAddress(message.getAddressSimpleString());
queueNames.forEach(name -> {
Binding binding = bindings.getBinding(name);
if (binding != null) {
try {
binding.route(message, context);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
});
}


@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {
// Do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.postoffice;

import java.util.Collection;
import java.util.function.BiConsumer;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand All @@ -32,6 +33,8 @@ public interface Bindings extends UnproposalListener {

Collection<Binding> getBindings();

Binding getBinding(String name);

void addBinding(Binding binding);

Binding removeBindingByUniqueName(SimpleString uniqueName);
Expand All @@ -55,4 +58,8 @@ public interface Bindings extends UnproposalListener {
void route(Message message, RoutingContext context) throws Exception;

boolean allowRedistribute();

void forEach(BiConsumer<SimpleString, Binding> bindingConsumer);

int size();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
Expand All @@ -51,6 +52,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.BiConsumer;

public final class BindingsImpl implements Bindings {

Expand Down Expand Up @@ -119,6 +121,11 @@ public void unproposed(SimpleString groupID) {
}
}

@Override
public Binding getBinding(String name) {
return bindingsNameMap.get(SimpleString.toSimpleString(name));
}

@Override
public void addBinding(final Binding binding) {
try {
Expand Down Expand Up @@ -183,6 +190,17 @@ public boolean allowRedistribute() {
return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
}

@Override
public void forEach(BiConsumer<SimpleString, Binding> bindingConsumer) {
bindingsNameMap.forEach(bindingConsumer);
}

@Override
public int size() {
return bindingsNameMap.size();
}


@Override
public Message redistribute(final Message message,
final Queue originatingQueue,
Expand Down Expand Up @@ -439,6 +457,10 @@ private static boolean matchBinding(final Message message,
}
}

if (loadBalancingType.equals(MessageLoadBalancingType.LOCAL_ONLY) && binding instanceof RemoteQueueBinding) {
return false;
}

final Filter filter = binding.getFilter();

if (filter == null || filter.match(message)) {
Expand Down Expand Up @@ -577,6 +599,9 @@ private String debugBindings() {
private void routeFromCluster(final Message message,
final RoutingContext context,
final byte[] ids) throws Exception {
if (!context.isMirrorDisabled()) {
context.setMirrorOption(MirrorOption.individualRoute);
}
byte[] idsToAck = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_ACK_IDS);

List<Long> idsToAckList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;

/** This is to be used in conjunction with RoutingContext, where we control certain semantics during routing.
* */
public enum MirrorOption {
enabled,
disabled,
individualRoute
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
Expand All @@ -40,9 +41,15 @@ public interface RoutingContext {

/** If the routing is from MirrorController, we don't redo mirrorController
* to avoid*/
MirrorOption getMirrorOption();

void forEachDurable(Consumer<Queue> consumer);

RoutingContext setMirrorOption(MirrorOption option);

boolean isMirrorDisabled();

RoutingContext setMirrorDisabled(boolean mirrorDisabled);
boolean isMirrorIndividualRoute();

/** return true if every queue routed is internal */
boolean isInternal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ public void nodeUP(final TopologyMember topologyMember, final boolean last) {
} else {
// Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
// actually routed to at that address though
queue = server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.MULTICAST).setAutoCreateAddress(true).setMaxConsumers(-1).setPurgeOnNoConsumers(false));
queue = server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.MULTICAST).setAutoCreateAddress(true).setMaxConsumers(-1).setPurgeOnNoConsumers(false).setInternal(true));
}

// There are a few things that will behave differently when it's an internal queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
package org.apache.activemq.artemis.core.server.cluster.impl;

public enum MessageLoadBalancingType {
OFF, STRICT, ON_DEMAND, OFF_WITH_REDISTRIBUTION;
OFF, STRICT, ON_DEMAND, OFF_WITH_REDISTRIBUTION, LOCAL_ONLY; // notice that LOCAL_ONLY is an internal use only option. When Mirror sends a message to a target mirror, messages should be routed locally only and to not any other cluster.
}
Original file line number Diff line number Diff line change
Expand Up @@ -3986,7 +3986,7 @@ public Queue createQueue(final QueueConfiguration queueConfiguration, boolean ig
callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfiguration));
}

if (mirrorControllerService != null) {
if (mirrorControllerService != null && !queueConfiguration.isInternal()) {
mirrorControllerService.createQueue(queueConfiguration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingContext;
Expand Down Expand Up @@ -707,6 +708,10 @@ public QueueImpl(final QueueConfiguration queueConfiguration,

this.server = server;

if (queueConfiguration.isInternal()) {
this.internalQueue = queueConfiguration.isInternal();
}

scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this);

if (addressSettingsRepository != null) {
Expand Down Expand Up @@ -3540,7 +3545,7 @@ private RoutingStatus move(final Transaction originalTX,
// we Disable mirror on expiration as the target might be also expiring it
// and this could cause races
// we will only send the ACK for the expiration with the reason=EXPIRE and the expire will be played on the mirror side
context.setMirrorDisabled(true);
context.setMirrorOption(MirrorOption.disabled);
}

routingStatus = postOffice.route(copyMessage, context, false, rejectDuplicate, binding);
Expand Down

0 comments on commit 2a81a0a

Please sign in to comment.