Skip to content

Commit

Permalink
ARTEMIS-3753 Prevent sending message to internal queues on mirror
Browse files Browse the repository at this point in the history
In cluster configuration messages could be routed to internal queues for
further delivering on different broker. We need to check that before
sending to SNF, otherwise message can stuck on target server and will
never receive ACK.

co-author: Clebert Suconic

Discusssions on apache#4012 and apache#4038

(cherry picked from commit 99302b1)

downstream: ENTMQBR-6592
  • Loading branch information
iliya-gr authored and clebertsuconic committed Aug 19, 2022
1 parent aa57467 commit 6e42366
Show file tree
Hide file tree
Showing 7 changed files with 1,176 additions and 6 deletions.
Expand Up @@ -201,6 +201,13 @@ public void sendMessage(Message message, RoutingContext context, List<MessageRef
return;
}

if (context.isInternal()) {
if (logger.isTraceEnabled()) {
logger.trace("server " + server + " is discarding send to avoid sending to internal queue");
}
return;
}

if (logger.isTraceEnabled()) {
logger.trace(server + " send message " + message);
}
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
Expand Down Expand Up @@ -459,7 +460,7 @@ private boolean sendMessage(AMQPMessage message, ACKMessageOperation messageComp
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);

routingContext.clear().setMirrorSource(this);
routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.OFF);
server.getPostOffice().route(message, routingContext, false);
// We use this as part of a transaction because of the duplicate detection cache that needs to be done atomically
transaction.commit();
Expand Down
Expand Up @@ -325,6 +325,14 @@ private void handleScaledDownMessage(final Message message, final byte[] ids) {
}
}

private MessageLoadBalancingType getMessageLoadBalancingType(RoutingContext context) {
if (context.getLoadBalancingType() != null) {
return context.getLoadBalancingType();
} else {
return this.messageLoadBalancingType;
}
}

private void simpleRouting(final Message message,
final RoutingContext context,
final int currentVersion) throws Exception {
Expand All @@ -333,7 +341,7 @@ private void simpleRouting(final Message message,
}

routingNameBindingMap.forEachBindings((bindings, nextPosition) -> {
final Binding nextBinding = getNextBinding(message, bindings, nextPosition);
final Binding nextBinding = getNextBinding(message, bindings, nextPosition, getMessageLoadBalancingType(context));
if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) {
context.setReusable(true, currentVersion);
} else {
Expand Down Expand Up @@ -362,7 +370,8 @@ public String toString() {
*/
private Binding getNextBinding(final Message message,
final Binding[] bindings,
final CopyOnWriteBindings.BindingIndex bindingIndex) {
final CopyOnWriteBindings.BindingIndex bindingIndex,
final MessageLoadBalancingType loadBalancingType) {
int nextPosition = bindingIndex.getIndex();

final int bindingsCount = bindings.length;
Expand All @@ -373,8 +382,6 @@ private Binding getNextBinding(final Message message,

Binding nextBinding = null;
int lastLowPriorityBinding = -1;
// snapshot this, to save loading it on each iteration
final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;

for (int i = 0; i < bindingsCount; i++) {
final Binding binding = bindings[nextPosition];
Expand Down Expand Up @@ -438,7 +445,7 @@ private void routeUsingStrictOrdering(final Message message,

if (resp == null) {
// ok let's find the next binding to propose
Binding theBinding = getNextBinding(message, bindings, nextPosition);
Binding theBinding = getNextBinding(message, bindings, nextPosition, getMessageLoadBalancingType(context));
if (theBinding == null) {
return;
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;

Expand All @@ -41,6 +42,9 @@ public interface RoutingContext {
* to avoid*/
boolean isMirrorController();

/** return true if every queue routed is internal */
boolean isInternal();

MirrorController getMirrorSource();

RoutingContext setMirrorSource(MirrorController mirrorController);
Expand Down Expand Up @@ -95,5 +99,9 @@ public interface RoutingContext {

RoutingContext setDuplicateDetection(boolean value);

RoutingContext setLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType);

MessageLoadBalancingType getLoadBalancingType();


}
Expand Up @@ -31,6 +31,7 @@
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
Expand All @@ -52,13 +53,18 @@ public class RoutingContextImpl implements RoutingContext {

private RoutingType previousRoutingType;

// if we wanted to bypass the load balancing configured elsewhere
private MessageLoadBalancingType loadBalancingType;

/* To be set by the Mirror target on the server, to avoid ping pongs or reflections of messages between mirrors */
private MirrorController mirrorControllerSource;

private RoutingType routingType;

Boolean reusable = null;

Boolean internalOnly = null;

volatile int version;

private final Executor executor;
Expand Down Expand Up @@ -95,6 +101,11 @@ public boolean isReusable() {
return reusable != null && reusable;
}

@Override
public boolean isInternal() {
return internalOnly != null && internalOnly;
}

@Override
public int getPreviousBindingsVersion() {
return version;
Expand Down Expand Up @@ -138,6 +149,8 @@ public RoutingContext clear() {

this.reusable = null;

this.internalOnly = null;

return this;
}

Expand All @@ -163,6 +176,13 @@ public void addQueue(final SimpleString address, final Queue queue) {
listing.getNonDurableQueues().add(queue);
}

if (internalOnly == null) {
internalOnly = true;
}

// every queue added has to be internal only
internalOnly = internalOnly && queue.isInternalQueue();

queueCount++;
}

Expand Down Expand Up @@ -198,6 +218,16 @@ private void internalprocessReferences(final List<MessageReference> refs, final
}
}

@Override
public RoutingContextImpl setLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
this.loadBalancingType = messageLoadBalancingType;
return this;
}

@Override
public MessageLoadBalancingType getLoadBalancingType() {
return loadBalancingType;
}

@Override
public void addQueueWithAck(SimpleString address, Queue queue) {
Expand Down

0 comments on commit 6e42366

Please sign in to comment.