Skip to content

Commit

Permalink
ARTEMIS-4207 Improving redistribution fix over large messages
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Apr 10, 2023
1 parent d23ced5 commit e368dac
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public void onIOException(Throwable code, String message, String file) {
});
}

public NullStorageManager(int nextId) {
this();
this.setNextId(nextId);
}

@Override
public void criticalError(Throwable error) {
ioCriticalErrorListener.onIOException(error, error.getMessage(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ default MessageReference reroute(Message message, Queue queue, Transaction tx) t
MessageReference reload(Message message, Queue queue, Transaction tx) throws Exception;

Pair<RoutingContext, Message> redistribute(Message message,
Queue originatingQueue,
Transaction tx) throws Exception;
Queue originatingQueue) throws Exception;

void processRoute(Message message, RoutingContext context, boolean direct) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
Expand All @@ -45,8 +46,8 @@
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.Proposal;
import org.apache.activemq.artemis.core.server.group.impl.Response;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
Expand Down Expand Up @@ -76,7 +77,7 @@ public final class BindingsImpl implements Bindings {

private final SimpleString name;

private final IDGenerator idGenerator;
private final StorageManager storageManager;

private static final AtomicInteger sequenceVersion = new AtomicInteger(Integer.MIN_VALUE);

Expand All @@ -85,9 +86,9 @@ public final class BindingsImpl implements Bindings {
*/
private final AtomicInteger version = new AtomicInteger(sequenceVersion.incrementAndGet());

public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, IDGenerator idGenerator) {
public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, StorageManager storageManager) {
this.groupingHandler = groupingHandler;
this.idGenerator = idGenerator;
this.storageManager = storageManager;
this.name = name;
}

Expand Down Expand Up @@ -235,12 +236,16 @@ public Message redistribute(final Message message,
// The message needs a new ID during the redistribution
// We have to create the new ID only after we can guarantee it will be routed
// otherwise we may leave large messages stranded in the folder
final Message copyRedistribute = message.copy(idGenerator.generateID());
final Message copyRedistribute = message.copy(storageManager.generateID());
if (logger.isDebugEnabled()) {
logger.debug("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID());
}
copyRedistribute.setAddress(message.getAddress());

if (context.getTransaction() == null) {
context.setTransaction(new TransactionImpl(storageManager));
}

bindingIndex.setIndex(nextPosition);
nextBinding.route(copyRedistribute, context);
return copyRedistribute;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1387,12 +1387,11 @@ public MessageReference reload(final Message message, final Queue queue, final T
*/
@Override
public Pair<RoutingContext, Message> redistribute(final Message message,
final Queue originatingQueue,
final Transaction tx) throws Exception {
final Queue originatingQueue) throws Exception {
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());

if (bindings != null && bindings.allowRedistribute()) {
RoutingContext context = new RoutingContextImpl(tx);
RoutingContext context = new RoutingContextImpl(null);

// the redistributor will make a copy of the message if it can be redistributed
Message redistributedMessage = bindings.redistribute(message, originatingQueue, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,23 +113,23 @@ public synchronized HandleStatus handle(final MessageReference reference) throws
return HandleStatus.NO_MATCH;
}

final Transaction tx = new TransactionImpl(storageManager);

final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue);

if (routingInfo == null) {
logger.debug("postOffice.redistribute return null for message {}", reference);
tx.rollback();
return HandleStatus.BUSY;
}

postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
RoutingContext context = routingInfo.getA();
Message message = routingInfo.getB();

postOffice.processRoute(message, context, false);

if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(reference.getMessage(), "redistributing");
}

ackRedistribution(reference, tx);
ackRedistribution(reference, context.getTransaction());

return HandleStatus.HANDLED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
Expand All @@ -31,7 +32,6 @@
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
Expand All @@ -52,7 +52,7 @@ private static class BindingFactoryFake implements BindingsFactory {

@Override
public Bindings createBindings(SimpleString address) {
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
return new BindingsImpl(address, null, new NullStorageManager(1000));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
Expand All @@ -32,7 +33,6 @@
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
Expand All @@ -54,7 +54,7 @@ private static class BindingFactoryFake implements BindingsFactory {

@Override
public Bindings createBindings(SimpleString address) {
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
return new BindingsImpl(address, null, new NullStorageManager(1000));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
Expand All @@ -45,7 +46,6 @@
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.junit.Test;

public class BindingsImplTest extends ActiveMQTestBase {
Expand All @@ -55,7 +55,7 @@ public void testGetNextBindingWithLoadBalancingOnDemand() throws Exception {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.ON_DEMAND;
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(1, fake.routedCount.get());
Expand All @@ -66,7 +66,7 @@ public void testGetNextBindingWithLoadBalancingOff() throws Exception {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.OFF;
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(0, fake.routedCount.get());
Expand All @@ -77,7 +77,7 @@ public void testGetNextBindingWithLoadBalancingOffWithRedistribution() throws Ex
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION;
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(0, fake.routedCount.get());
Expand All @@ -102,7 +102,7 @@ public void testRemoveWhileRedistributing() throws Exception {
private void internalTest(final boolean route) throws Exception {
final FakeBinding fake = new FakeBinding(new SimpleString("a"));

final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000));
bind.addBinding(fake);
bind.addBinding(new FakeBinding(new SimpleString("a")));
bind.addBinding(new FakeBinding(new SimpleString("a")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
Expand All @@ -34,7 +35,6 @@
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.junit.Ignore;
import org.junit.Test;

Expand Down Expand Up @@ -133,7 +133,7 @@ public boolean isAddressBound(SimpleString address) throws Exception {

@Override
public Bindings createBindings(SimpleString address) throws Exception {
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
return new BindingsImpl(address, null, new NullStorageManager(1000));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleStrin

@Override
public Pair<RoutingContext, Message> redistribute(final Message message,
final Queue originatingQueue,
final Transaction tx) throws Exception {
final Queue originatingQueue) throws Exception {
return null;
}

Expand Down

0 comments on commit e368dac

Please sign in to comment.