Skip to content

Commit

Permalink
ARTEMIS-4247 BrokerClustering vs Mirror code improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Apr 25, 2023
1 parent 0eefc38 commit fea84e3
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
Expand Down Expand Up @@ -77,11 +77,9 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id");
public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst");

/** When a clustered node (from regular cluster connections) receives a message
it will have target queues associated with it
this could be from message redistribution or simply load balancing.
an that case this will have the queue associated with it */
public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-trg-q");
/* In a Multi-cast address (or JMS Topics) we may in certain cases (clustered-routing for instance)
select which particular queues will receive the routing output */
public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-mr-trg-q");

// Capabilities
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
Expand All @@ -90,7 +88,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(INTERNAL_ID.toString());
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(BROKER_ID.toString());

private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorOption(MirrorOption.disabled));
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));

final Queue snfQueue;
final ActiveMQServer server;
Expand Down Expand Up @@ -614,7 +612,7 @@ private Message createMessage(SimpleString address, SimpleString queue, Object e
public static void route(ActiveMQServer server, Message message) throws Exception {
message.setMessageID(server.getStorageManager().generateID());
RoutingContext ctx = mirrorControlRouting.get();
ctx.clear();
ctx.clear().setMirrorOption(MirrorOption.disabled);
server.getPostOffice().route(message, ctx, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public interface Bindings extends UnproposalListener {

boolean allowRedistribute();

void forEach(BiConsumer<SimpleString, Binding> bindingConsumer);
void forEach(BiConsumer<String, Binding> bindingConsumer);

int size();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
Expand Down Expand Up @@ -69,7 +69,7 @@ public final class BindingsImpl implements Bindings {
* This is the same as bindingsIdMap but indexed on the binding's uniqueName rather than ID. Two maps are
* maintained to speed routing, otherwise we'd have to loop through the bindingsIdMap when routing to an FQQN.
*/
private final Map<SimpleString, Binding> bindingsNameMap = new ConcurrentHashMap<>();
private final Map<String, Binding> bindingsNameMap = new ConcurrentHashMap<>();

private final Set<Binding> exclusiveBindings = new CopyOnWriteArraySet<>();

Expand Down Expand Up @@ -123,7 +123,7 @@ public void unproposed(SimpleString groupID) {

@Override
public Binding getBinding(String name) {
return bindingsNameMap.get(SimpleString.toSimpleString(name));
return bindingsNameMap.get(name);
}

@Override
Expand All @@ -138,7 +138,7 @@ public void addBinding(final Binding binding) {
}

bindingsIdMap.put(binding.getID(), binding);
bindingsNameMap.put(binding.getUniqueName(), binding);
bindingsNameMap.put(String.valueOf(binding.getUniqueName()), binding);

if (binding instanceof RemoteQueueBinding) {
setMessageLoadBalancingType(((RemoteQueueBinding) binding).getMessageLoadBalancingType());
Expand All @@ -162,7 +162,7 @@ private void updated() {

@Override
public Binding removeBindingByUniqueName(final SimpleString bindingUniqueName) {
final Binding binding = bindingsNameMap.remove(bindingUniqueName);
final Binding binding = bindingsNameMap.remove(String.valueOf(bindingUniqueName));
if (binding == null) {
return null;
}
Expand All @@ -174,7 +174,7 @@ public Binding removeBindingByUniqueName(final SimpleString bindingUniqueName) {
}

bindingsIdMap.remove(binding.getID());
assert !bindingsNameMap.containsKey(binding.getUniqueName());
assert !bindingsNameMap.containsKey(String.valueOf(binding.getUniqueName()));

if (logger.isTraceEnabled()) {
logger.trace("Removing binding {} from {} bindingTable: {}", binding, this, debugBindings());
Expand All @@ -191,7 +191,7 @@ public boolean allowRedistribute() {
}

@Override
public void forEach(BiConsumer<SimpleString, Binding> bindingConsumer) {
public void forEach(BiConsumer<String, Binding> bindingConsumer) {
bindingsNameMap.forEach(bindingConsumer);
}

Expand Down Expand Up @@ -313,7 +313,7 @@ private void route(final Message message,
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
} else if (CompositeAddress.isFullyQualified(message.getAddress())) {
context.clear().setReusable(false);
final Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
final Binding theBinding = bindingsNameMap.get(String.valueOf(CompositeAddress.extractQueueName(message.getAddressSimpleString())));
if (theBinding != null) {
theBinding.route(message, context);
}
Expand Down Expand Up @@ -601,6 +601,7 @@ private void routeFromCluster(final Message message,
final byte[] ids) throws Exception {
if (!context.isMirrorDisabled()) {
context.setMirrorOption(MirrorOption.individualRoute);
context.setReusable(false);
}
byte[] idsToAck = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_ACK_IDS);

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ public interface RoutingContext {
*/
boolean isReusable();

/** If the routing is from MirrorController, we don't redo mirrorController
* to avoid*/
MirrorOption getMirrorOption();

void forEachDurable(Consumer<Queue> consumer);
Expand Down Expand Up @@ -114,5 +112,11 @@ public interface RoutingContext {

ServerSession getServerSession();

enum MirrorOption {
enabled,
disabled,
individualRoute
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
Expand Down Expand Up @@ -708,9 +708,7 @@ public QueueImpl(final QueueConfiguration queueConfiguration,

this.server = server;

if (queueConfiguration.isInternal()) {
this.internalQueue = queueConfiguration.isInternal();
}
this.internalQueue = queueConfiguration.isInternal();

scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
Expand Down Expand Up @@ -168,9 +167,7 @@ public RoutingContextImpl clear() {

this.internalOnly = null;

if (mirrorOption == MirrorOption.individualRoute) {
mirrorOption = MirrorOption.enabled;
}
mirrorOption = MirrorOption.enabled;

return this;
}
Expand Down

0 comments on commit fea84e3

Please sign in to comment.