Skip to content
Permalink
Browse files
ARTEMIS-3753 Prevent sending message to internal queues on mirror
In cluster configuration messages could be routed to internal queues for
further delivering on different broker. We need to check that before
sending to SNF, otherwise message can stuck on target server and will
never receive ACK.

co-author: Clebert Suconic

Discusssions on #4012 and #4038
  • Loading branch information
iliya-gr authored and clebertsuconic committed Apr 23, 2022
1 parent c6bfe34 commit 99302b193527c678b525a198f0ffa41615eca498
Showing 7 changed files with 1,176 additions and 6 deletions.
@@ -201,6 +201,13 @@ public void sendMessage(Message message, RoutingContext context, List<MessageRef
return;
}

if (context.isInternal()) {
if (logger.isTraceEnabled()) {
logger.trace("server " + server + " is discarding send to avoid sending to internal queue");
}
return;
}

if (logger.isTraceEnabled()) {
logger.trace(server + " send message " + message);
}
@@ -33,6 +33,7 @@
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@@ -459,7 +460,7 @@ private boolean sendMessage(AMQPMessage message, ACKMessageOperation messageComp
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);

routingContext.clear().setMirrorSource(this);
routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.OFF);
server.getPostOffice().route(message, routingContext, false);
// We use this as part of a transaction because of the duplicate detection cache that needs to be done atomically
transaction.commit();
@@ -325,6 +325,14 @@ private void handleScaledDownMessage(final Message message, final byte[] ids) {
}
}

private MessageLoadBalancingType getMessageLoadBalancingType(RoutingContext context) {
if (context.getLoadBalancingType() != null) {
return context.getLoadBalancingType();
} else {
return this.messageLoadBalancingType;
}
}

private void simpleRouting(final Message message,
final RoutingContext context,
final int currentVersion) throws Exception {
@@ -333,7 +341,7 @@ private void simpleRouting(final Message message,
}

routingNameBindingMap.forEachBindings((bindings, nextPosition) -> {
final Binding nextBinding = getNextBinding(message, bindings, nextPosition);
final Binding nextBinding = getNextBinding(message, bindings, nextPosition, getMessageLoadBalancingType(context));
if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) {
context.setReusable(true, currentVersion);
} else {
@@ -362,7 +370,8 @@ public String toString() {
*/
private Binding getNextBinding(final Message message,
final Binding[] bindings,
final CopyOnWriteBindings.BindingIndex bindingIndex) {
final CopyOnWriteBindings.BindingIndex bindingIndex,
final MessageLoadBalancingType loadBalancingType) {
int nextPosition = bindingIndex.getIndex();

final int bindingsCount = bindings.length;
@@ -373,8 +382,6 @@ private Binding getNextBinding(final Message message,

Binding nextBinding = null;
int lastLowPriorityBinding = -1;
// snapshot this, to save loading it on each iteration
final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;

for (int i = 0; i < bindingsCount; i++) {
final Binding binding = bindings[nextPosition];
@@ -438,7 +445,7 @@ private void routeUsingStrictOrdering(final Message message,

if (resp == null) {
// ok let's find the next binding to propose
Binding theBinding = getNextBinding(message, bindings, nextPosition);
Binding theBinding = getNextBinding(message, bindings, nextPosition, getMessageLoadBalancingType(context));
if (theBinding == null) {
return;
}
@@ -22,6 +22,7 @@
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;

@@ -41,6 +42,9 @@ public interface RoutingContext {
* to avoid*/
boolean isMirrorController();

/** return true if every queue routed is internal */
boolean isInternal();

MirrorController getMirrorSource();

RoutingContext setMirrorSource(MirrorController mirrorController);
@@ -95,5 +99,9 @@ public interface RoutingContext {

RoutingContext setDuplicateDetection(boolean value);

RoutingContext setLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType);

MessageLoadBalancingType getLoadBalancingType();


}
@@ -31,6 +31,7 @@
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
@@ -52,13 +53,18 @@ public class RoutingContextImpl implements RoutingContext {

private RoutingType previousRoutingType;

// if we wanted to bypass the load balancing configured elsewhere
private MessageLoadBalancingType loadBalancingType;

/* To be set by the Mirror target on the server, to avoid ping pongs or reflections of messages between mirrors */
private MirrorController mirrorControllerSource;

private RoutingType routingType;

Boolean reusable = null;

Boolean internalOnly = null;

volatile int version;

private final Executor executor;
@@ -95,6 +101,11 @@ public boolean isReusable() {
return reusable != null && reusable;
}

@Override
public boolean isInternal() {
return internalOnly != null && internalOnly;
}

@Override
public int getPreviousBindingsVersion() {
return version;
@@ -138,6 +149,8 @@ public RoutingContext clear() {

this.reusable = null;

this.internalOnly = null;

return this;
}

@@ -163,6 +176,13 @@ public void addQueue(final SimpleString address, final Queue queue) {
listing.getNonDurableQueues().add(queue);
}

if (internalOnly == null) {
internalOnly = true;
}

// every queue added has to be internal only
internalOnly = internalOnly && queue.isInternalQueue();

queueCount++;
}

@@ -198,6 +218,16 @@ private void internalprocessReferences(final List<MessageReference> refs, final
}
}

@Override
public RoutingContextImpl setLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
this.loadBalancingType = messageLoadBalancingType;
return this;
}

@Override
public MessageLoadBalancingType getLoadBalancingType() {
return loadBalancingType;
}

@Override
public void addQueueWithAck(SimpleString address, Queue queue) {

0 comments on commit 99302b1

Please sign in to comment.