Skip to content
Permalink
Browse files
This closes #4012
  • Loading branch information
clebertsuconic committed Apr 23, 2022
2 parents c6bfe34 + 99302b1 commit a38fae1fbd21625203e687623e119a7b56a3e473
Showing 7 changed files with 1,176 additions and 6 deletions.
@@ -201,6 +201,13 @@ public void sendMessage(Message message, RoutingContext context, List<MessageRef
return;
}

if (context.isInternal()) {
if (logger.isTraceEnabled()) {
logger.trace("server " + server + " is discarding send to avoid sending to internal queue");
}
return;
}

if (logger.isTraceEnabled()) {
logger.trace(server + " send message " + message);
}
@@ -33,6 +33,7 @@
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@@ -459,7 +460,7 @@ private boolean sendMessage(AMQPMessage message, ACKMessageOperation messageComp
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);

routingContext.clear().setMirrorSource(this);
routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.OFF);
server.getPostOffice().route(message, routingContext, false);
// We use this as part of a transaction because of the duplicate detection cache that needs to be done atomically
transaction.commit();
@@ -325,6 +325,14 @@ private void handleScaledDownMessage(final Message message, final byte[] ids) {
}
}

private MessageLoadBalancingType getMessageLoadBalancingType(RoutingContext context) {
if (context.getLoadBalancingType() != null) {
return context.getLoadBalancingType();
} else {
return this.messageLoadBalancingType;
}
}

private void simpleRouting(final Message message,
final RoutingContext context,
final int currentVersion) throws Exception {
@@ -333,7 +341,7 @@ private void simpleRouting(final Message message,
}

routingNameBindingMap.forEachBindings((bindings, nextPosition) -> {
final Binding nextBinding = getNextBinding(message, bindings, nextPosition);
final Binding nextBinding = getNextBinding(message, bindings, nextPosition, getMessageLoadBalancingType(context));
if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) {
context.setReusable(true, currentVersion);
} else {
@@ -362,7 +370,8 @@ public String toString() {
*/
private Binding getNextBinding(final Message message,
final Binding[] bindings,
final CopyOnWriteBindings.BindingIndex bindingIndex) {
final CopyOnWriteBindings.BindingIndex bindingIndex,
final MessageLoadBalancingType loadBalancingType) {
int nextPosition = bindingIndex.getIndex();

final int bindingsCount = bindings.length;
@@ -373,8 +382,6 @@ private Binding getNextBinding(final Message message,

Binding nextBinding = null;
int lastLowPriorityBinding = -1;
// snapshot this, to save loading it on each iteration
final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;

for (int i = 0; i < bindingsCount; i++) {
final Binding binding = bindings[nextPosition];
@@ -438,7 +445,7 @@ private void routeUsingStrictOrdering(final Message message,

if (resp == null) {
// ok let's find the next binding to propose
Binding theBinding = getNextBinding(message, bindings, nextPosition);
Binding theBinding = getNextBinding(message, bindings, nextPosition, getMessageLoadBalancingType(context));
if (theBinding == null) {
return;
}
@@ -22,6 +22,7 @@
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;

@@ -41,6 +42,9 @@ public interface RoutingContext {
* to avoid*/
boolean isMirrorController();

/** return true if every queue routed is internal */
boolean isInternal();

MirrorController getMirrorSource();

RoutingContext setMirrorSource(MirrorController mirrorController);
@@ -95,5 +99,9 @@ public interface RoutingContext {

RoutingContext setDuplicateDetection(boolean value);

RoutingContext setLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType);

MessageLoadBalancingType getLoadBalancingType();


}
@@ -31,6 +31,7 @@
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
@@ -52,13 +53,18 @@ public class RoutingContextImpl implements RoutingContext {

private RoutingType previousRoutingType;

// if we wanted to bypass the load balancing configured elsewhere
private MessageLoadBalancingType loadBalancingType;

/* To be set by the Mirror target on the server, to avoid ping pongs or reflections of messages between mirrors */
private MirrorController mirrorControllerSource;

private RoutingType routingType;

Boolean reusable = null;

Boolean internalOnly = null;

volatile int version;

private final Executor executor;
@@ -95,6 +101,11 @@ public boolean isReusable() {
return reusable != null && reusable;
}

@Override
public boolean isInternal() {
return internalOnly != null && internalOnly;
}

@Override
public int getPreviousBindingsVersion() {
return version;
@@ -138,6 +149,8 @@ public RoutingContext clear() {

this.reusable = null;

this.internalOnly = null;

return this;
}

@@ -163,6 +176,13 @@ public void addQueue(final SimpleString address, final Queue queue) {
listing.getNonDurableQueues().add(queue);
}

if (internalOnly == null) {
internalOnly = true;
}

// every queue added has to be internal only
internalOnly = internalOnly && queue.isInternalQueue();

queueCount++;
}

@@ -198,6 +218,16 @@ private void internalprocessReferences(final List<MessageReference> refs, final
}
}

@Override
public RoutingContextImpl setLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
this.loadBalancingType = messageLoadBalancingType;
return this;
}

@Override
public MessageLoadBalancingType getLoadBalancingType() {
return loadBalancingType;
}

@Override
public void addQueueWithAck(SimpleString address, Queue queue) {

0 comments on commit a38fae1

Please sign in to comment.