Skip to content

Commit

Permalink
ARTEMIS-4207 Improved fix on LargeMessage Redistribution
Browse files Browse the repository at this point in the history
This fix will delay the message.copy to the redistributor itself.
Meaning no copy would be performed if the redistribution itself failed.

No need to remove a copy any longer
  • Loading branch information
clebertsuconic committed Mar 27, 2023
1 parent d139ad7 commit 15d39a1
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,15 @@ public interface Bindings extends UnproposalListener {

MessageLoadBalancingType getMessageLoadBalancingType();

boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception;
/**
*
* @param message the message being copied
* @param originatingQueue
* @param context
* @return a Copy of the message if redistribution succeeded, or null if it wasn't redistributed
* @throws Exception
*/
Message redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception;

void route(Message message, RoutingContext context) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.activemq.artemis.core.server.group.impl.Proposal;
import org.apache.activemq.artemis.core.server.group.impl.Response;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
Expand Down Expand Up @@ -75,15 +76,18 @@ public final class BindingsImpl implements Bindings {

private final SimpleString name;

private final IDGenerator idGenerator;

private static final AtomicInteger sequenceVersion = new AtomicInteger(Integer.MIN_VALUE);

/**
* This has a version about adds and removes
*/
private final AtomicInteger version = new AtomicInteger(sequenceVersion.incrementAndGet());

public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler) {
public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, IDGenerator idGenerator) {
this.groupingHandler = groupingHandler;
this.idGenerator = idGenerator;
this.name = name;
}

Expand Down Expand Up @@ -114,8 +118,6 @@ public void unproposed(SimpleString groupID) {
}
}



@Override
public void addBinding(final Binding binding) {
try {
Expand Down Expand Up @@ -181,12 +183,12 @@ public boolean allowRedistribute() {
}

@Override
public boolean redistribute(final Message message,
public Message redistribute(final Message message,
final Queue originatingQueue,
final RoutingContext context) throws Exception {
final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
if (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || loadBalancingType.equals(MessageLoadBalancingType.OFF)) {
return false;
return null;
}

logger.trace("Redistributing message {}", message);
Expand All @@ -198,7 +200,7 @@ public boolean redistribute(final Message message,
if (bindingsAndPosition == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
return false;
return null;
}

final Binding[] bindings = bindingsAndPosition.getA();
Expand Down Expand Up @@ -227,11 +229,21 @@ public boolean redistribute(final Message message,
}
}
if (nextBinding == null) {
return false;
return null;
}

// The message needs a new ID during the redistribution
// We have to create the new ID only after we can guarantee it will be routed
// otherwise we may leave large messages stranded in the folder
final Message copyRedistribute = message.copy(idGenerator.generateID());
if (logger.isDebugEnabled()) {
logger.debug("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID());
}
copyRedistribute.setAddress(message.getAddress());

bindingIndex.setIndex(nextPosition);
nextBinding.route(message, context);
return true;
nextBinding.route(copyRedistribute, context);
return copyRedistribute;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1392,35 +1392,13 @@ public Pair<RoutingContext, Message> redistribute(final Message message,
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());

if (bindings != null && bindings.allowRedistribute()) {
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
final Message copyRedistribute = message.copy(storageManager.generateID());
if (logger.isDebugEnabled()) {
logger.debug("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID());
}
copyRedistribute.setAddress(message.getAddress());

RoutingContext context = new RoutingContextImpl(tx);

boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);

if (routed) {
return new Pair<>(context, copyRedistribute);
} else {
// things have changed, we are not redistributing any more
if (copyRedistribute.isLargeMessage()) {
LargeServerMessage lsm = (LargeServerMessage) copyRedistribute;
postOfficeExecutor.execute(() -> {
try {
logger.debug("Removing large message {} since the routing tables have changed", lsm.getAppendFile());
lsm.deleteFile();
} catch (Exception e) {
logger.warn("Error removing {}", copyRedistribute);
}
});
}
// the redistributor will make a copy of the message if it can be redistributed
Message redistributedMessage = bindings.redistribute(message, originatingQueue, context);

if (redistributedMessage != null) {
return new Pair<>(context, redistributedMessage);
}
}

Expand Down Expand Up @@ -2101,7 +2079,7 @@ public List<MessageReference> getListOnConsumer(long consumerID) {
@Override
public Bindings createBindings(final SimpleString address) {
GroupingHandler groupingHandler = server.getGroupingHandler();
BindingsImpl bindings = new BindingsImpl(CompositeAddress.extractAddressName(address), groupingHandler);
BindingsImpl bindings = new BindingsImpl(CompositeAddress.extractAddressName(address), groupingHandler, storageManager);
if (groupingHandler != null) {
groupingHandler.addListener(bindings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,42 @@ public void testRedistributionLargeMessageDirCleanup() throws Exception {
Wait.assertEquals(0, () -> getServer(0).getConfiguration().getLargeMessagesLocation().listFiles().length);
Wait.assertEquals(numMessages, () -> getServer(1).getConfiguration().getLargeMessagesLocation().listFiles().length);
}

@Test
public void testRedistributionLargeMessageDirCleanup2() throws Exception {
final long delay = 0;
final int numMessages = 5;

setRedistributionDelay(delay);
setupCluster(MessageLoadBalancingType.ON_DEMAND);

startServers(0, 1);

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());

createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);

waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);

waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 0, false);

send(0, "queues.testaddress", numMessages, true, null);
addConsumer(0, 0, "queue0", null);

verifyReceiveAll(numMessages, 0);
removeConsumer(0);

addConsumer(1, 1, "queue0", null);
verifyReceiveAll(numMessages, 1);
servers[1].stop();

send(0, "queues.testaddress", numMessages, true, null);

Wait.assertEquals(5, () -> getServer(0).getConfiguration().getLargeMessagesLocation().listFiles().length);
Wait.assertEquals(numMessages, () -> getServer(0).getConfiguration().getLargeMessagesLocation().listFiles().length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
Expand All @@ -51,7 +52,7 @@ private static class BindingFactoryFake implements BindingsFactory {

@Override
public Bindings createBindings(SimpleString address) {
return new BindingsImpl(address, null);
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
Expand All @@ -53,7 +54,7 @@ private static class BindingFactoryFake implements BindingsFactory {

@Override
public Bindings createBindings(SimpleString address) {
return new BindingsImpl(address, null);
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.junit.Test;

public class BindingsImplTest extends ActiveMQTestBase {
Expand All @@ -54,7 +55,7 @@ public void testGetNextBindingWithLoadBalancingOnDemand() throws Exception {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.ON_DEMAND;
final Bindings bind = new BindingsImpl(null, null);
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(1, fake.routedCount.get());
Expand All @@ -65,7 +66,7 @@ public void testGetNextBindingWithLoadBalancingOff() throws Exception {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.OFF;
final Bindings bind = new BindingsImpl(null, null);
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(0, fake.routedCount.get());
Expand All @@ -76,7 +77,7 @@ public void testGetNextBindingWithLoadBalancingOffWithRedistribution() throws Ex
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION;
final Bindings bind = new BindingsImpl(null, null);
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(0, fake.routedCount.get());
Expand All @@ -101,7 +102,7 @@ public void testRemoveWhileRedistributing() throws Exception {
private void internalTest(final boolean route) throws Exception {
final FakeBinding fake = new FakeBinding(new SimpleString("a"));

final Bindings bind = new BindingsImpl(null, null);
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
bind.addBinding(fake);
bind.addBinding(new FakeBinding(new SimpleString("a")));
bind.addBinding(new FakeBinding(new SimpleString("a")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.junit.Ignore;
import org.junit.Test;

Expand Down Expand Up @@ -132,7 +133,7 @@ public boolean isAddressBound(SimpleString address) throws Exception {

@Override
public Bindings createBindings(SimpleString address) throws Exception {
return new BindingsImpl(address, null);
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,10 @@ public void updated(QueueBinding binding) {
}

@Override
public boolean redistribute(Message message,
public Message redistribute(Message message,
Queue originatingQueue,
RoutingContext context) throws Exception {
return false;
return null;
}

@Override
Expand Down

0 comments on commit 15d39a1

Please sign in to comment.