Skip to content

Commit

Permalink
ARTEMIS-4207 Redistribution could leave messages stranded in the folder
Browse files Browse the repository at this point in the history
- redistribute received the handle call, it then copies the message
- the routing table changes
- the message is left behind

With the new version of the server these messages will be removed. But we should remove these right away
  • Loading branch information
clebertsuconic committed Mar 15, 2023
1 parent 4babdab commit 3a56015
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,9 @@ public LargeServerMessage createLargeMessage() {

@Override
public LargeServerMessage createLargeMessage(final long id, final Message message) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Initializing large message {}", id, new Exception("trace"));
}
try (ArtemisCloseable lock = closeableReadLock()) {
if (isReplicated()) {
replicator.largeMessageBegin(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ public static Message checkLargeMessage(Message message, StorageManager storageM

private static Message asLargeMessage(Message message, StorageManager storageManager) throws Exception {
ICoreMessage coreMessage = message.toCore();
LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
long id = storageManager.generateID();
if (logger.isDebugEnabled()) {
logger.debug("asLargeMessage create largeMessage with id={}", id);
}
LargeServerMessage lsm = storageManager.createLargeMessage(id, coreMessage);
ActiveMQBuffer messageBodyBuffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = messageBodyBuffer.readableBytes();

Expand Down Expand Up @@ -306,6 +310,9 @@ public LargeBody getLargeBody() {
@Override
public Message copy(final long newID) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Copy large message id={} as newID={}", this.getMessageID(), newID);
}
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
largeBody.copyInto(newMessage);
newMessage.releaseResources(true, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding

public static final SimpleString BRIDGE_CACHE_STR = new SimpleString("BRIDGE.");

private final Executor postOfficeExecutor;

private final AddressManager addressManager;

private final QueueFactory queueFactory;
Expand Down Expand Up @@ -192,6 +194,8 @@ public PostOfficeImpl(final ActiveMQServer server,

this.addressSettingsRepository = addressSettingsRepository;

this.postOfficeExecutor = server.getExecutorFactory().getExecutor();

this.server = server;
}

Expand Down Expand Up @@ -1391,7 +1395,8 @@ public Pair<RoutingContext, Message> redistribute(final Message message,
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
Message copyRedistribute = message.copy(storageManager.generateID());
final Message copyRedistribute = message.copy(storageManager.generateID());
logger.info("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID());
copyRedistribute.setAddress(message.getAddress());

RoutingContext context = new RoutingContextImpl(tx);
Expand All @@ -1400,6 +1405,20 @@ public Pair<RoutingContext, Message> redistribute(final Message message,

if (routed) {
return new Pair<>(context, copyRedistribute);
} else {
// things have changed, we are not redistributing any more
if (copyRedistribute.isLargeMessage()) {
LargeServerMessage lsm = (LargeServerMessage) copyRedistribute;
postOfficeExecutor.execute(() -> {
try {
logger.debug("Removing large message {} since the routing tables have changed", lsm.getAppendFile());
lsm.deleteFile();
} catch (Exception e) {
logger.warn("Error removing {}", copyRedistribute);
}
});
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,14 @@ public ServerSessionPacketHandler(final ActiveMQServer server,
}

private void clearLargeMessage() {
if (currentLargeMessage != null) {
logger.debug("pending large message on session being removed {}", currentLargeMessage);
}
synchronized (largeMessageLock) {
if (currentLargeMessage != null) {
try {
currentLargeMessage.deleteFile();
logger.debug("Remove file {} after a failed session", currentLargeMessage.getAppendFile());
} catch (Throwable error) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
} finally {
Expand Down Expand Up @@ -1069,12 +1073,29 @@ private void sendLarge(final Message message) throws Exception {
// need to create the LargeMessage before continue
long id = storageManager.generateID();

if (logger.isDebugEnabled()) {
logger.debug("initializing large message {}", id);
}
LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);

logger.trace("sendLarge::{}", largeMsg);

if (currentLargeMessage != null) {
ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID());

// this shouldn't really happen.
// Adding this just in case
final LargeServerMessage replaced = currentLargeMessage;
callExecutor.execute(() -> {
try {
if (replaced != null) {
logger.debug("Replaced failed being removed over interrupted send for message {}", replaced);
replaced.deleteFile();
}
} catch (Exception e) {
logger.warn("Error removing currentLargeMessage {}", replaced);
}
});
}

currentLargeMessage = largeMsg;
Expand Down Expand Up @@ -1106,11 +1127,19 @@ private void sendContinuations(final int packetSize,
LargeServerMessage message = currentLargeMessage;
currentLargeMessage.setStorageManager(storageManager);
currentLargeMessage = null;

logger.info("Sending {}", message.getMessageID());
try {
session.send(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), false, producers.get(senderID), false);
logger.info("Sending finished on {}", message.getMessageID());
} catch (Exception e) {
message.deleteFile();
throw e;
} catch (Throwable e) {
logger.warn("********************************************************************************");
logger.warn("Throwable on currentLargeMessage {}", message.getMessageID(), e);
logger.warn("********************************************************************************");

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.cluster.impl;

import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.List;

Expand All @@ -32,9 +33,13 @@
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Redistributor implements Consumer {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private boolean active;

private final StorageManager storageManager;
Expand Down Expand Up @@ -113,6 +118,7 @@ public synchronized HandleStatus handle(final MessageReference reference) throws
final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);

if (routingInfo == null) {
logger.debug("postOffice.redistribute return null for message {}", reference);
tx.rollback();
return HandleStatus.BUSY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,27 +118,52 @@ private Process startServer1() throws Exception {

@Test
public void testLargeMessageAMQPTX() throws Throwable {
testInterrupt("AMQP", true);
testInterrupt("AMQP", true, false);
}

@Test
public void testLargeMessageAMQPTXKill() throws Throwable {
testInterrupt("AMQP", true, true);
}

@Test
public void testInterruptAMQPNonTX() throws Throwable {
testInterrupt("AMQP", false);
testInterrupt("AMQP", false, false);
}

@Test
public void testInterruptAMQPNonTXKill() throws Throwable {
testInterrupt("AMQP", false, true);
}

@Test
public void testInterruptCORETX() throws Throwable {
testInterrupt("CORE", true);
testInterrupt("CORE", true, false);
}

@Test
public void testInterruptCORETXKill() throws Throwable {
testInterrupt("CORE", true, true);
}

@Test
public void testInterruptOPENWIRETX() throws Throwable {
testInterrupt("OPENWIRE", true);
testInterrupt("OPENWIRE", true, false);
}

@Test
public void testInterruptOPENWIRETXKill() throws Throwable {
testInterrupt("OPENWIRE", true, true);
}

@Test
public void testInterruptCORENonTX() throws Throwable {
testInterrupt("CORE", false);
testInterrupt("CORE", false, false);
}

@Test
public void testInterruptCORENonTXKill() throws Throwable {
testInterrupt("CORE", false, true);
}

private CountDownLatch startSendingThreads(Executor executor, String protocol, int broker, int threads, boolean tx, String queueName) {
Expand Down Expand Up @@ -227,7 +252,7 @@ private CountDownLatch startConsumingThreads(Executor executor, String protocol,

// this test has sleeps as the test will send while still active
// we keep sending all the time.. so the testInterruptLM acts like a controller telling the threads when to stop
private void testInterrupt(String protocol, boolean tx) throws Throwable {
private void testInterrupt(String protocol, boolean tx, boolean useKill) throws Throwable {
final int SENDING_THREADS = 10;
final int CONSUMING_THREADS = 10;
final AtomicInteger errors = new AtomicInteger(0); // I don't expect many errors since this test is disconnecting and reconnecting the server
Expand All @@ -242,7 +267,7 @@ private void testInterrupt(String protocol, boolean tx) throws Throwable {

Thread.sleep(2000);

serverProcess.destroyForcibly();
killProcess(serverProcess, useKill);
runningSend = false;
runningConsumer = false;
Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
Expand All @@ -258,7 +283,7 @@ private void testInterrupt(String protocol, boolean tx) throws Throwable {
sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName);
receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);

serverProcess2.destroyForcibly();
killProcess(serverProcess2, useKill);
Assert.assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
runningSend = false;
runningConsumer = false;
Expand All @@ -277,17 +302,17 @@ private void testInterrupt(String protocol, boolean tx) throws Throwable {
QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000);
QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000);

Wait.assertEquals(0, queueControl1::getMessageCount);
Wait.assertEquals(0, queueControl2::getMessageCount);
File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages");
File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + "/data/large-messages");

Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0);

runningConsumer = false;
Assert.assertTrue(receiverDone.await(10, TimeUnit.SECONDS));

File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages");
File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + "/data/large-messages");

Wait.assertEquals(0, () -> lmFolder.listFiles().length);
Wait.assertEquals(0, () -> lmFolder2.listFiles().length);
// no need to use wait here, the previous check should have checked that already
Assert.assertEquals(0, lmFolder.listFiles().length);
Assert.assertEquals(0, lmFolder2.listFiles().length);
Assert.assertEquals(0, errors.get());
}

Expand All @@ -301,6 +326,14 @@ public void testBridgeFailureCORE() throws Throwable {
testInterruptFailOnBridge("CORE", false);
}

private void killProcess(Process process, boolean useKill) throws Exception {
if (useKill) {
Runtime.getRuntime().exec("kill -SIGINT " + process.pid());
} else {
process.destroyForcibly();
}
}


// this is a slight variation of testInterruptLM where I switch over consumers before killing the previous node
// this is to force messages being redistributed and try to get the bridge to failure.
Expand All @@ -322,13 +355,13 @@ private void testInterruptFailOnBridge(String protocol, boolean tx) throws Throw

runningSend = runningConsumer = false;

serverProcess.destroyForcibly();
killProcess(serverProcess, false);
Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));

sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);
serverProcess.destroyForcibly();
killProcess(serverProcess, false);
Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
serverProcess = startServer0();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public void testInterruptLargeMessageCORENonTXPaging() throws Throwable {
testInterruptLM("CORE", false, true);
}

private void killProcess(Process process) throws Exception {
Runtime.getRuntime().exec("kill -SIGINT " + process.pid());
}


private void testInterruptLM(String protocol, boolean tx, boolean paging) throws Throwable {
final int BODY_SIZE = 500 * 1024;
final int NUMBER_OF_MESSAGES = 10; // this is per producer
Expand Down Expand Up @@ -213,7 +218,8 @@ private void testInterruptLM(String protocol, boolean tx, boolean paging) throws
}

Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
serverProcess.destroyForcibly();
killProcess(serverProcess);
Assert.assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
serverProcess = startServer(SERVER_NAME_0, 0, 0);

Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
Expand Down

0 comments on commit 3a56015

Please sign in to comment.