Skip to content

Commit

Permalink
ARTEMIS-4206 Unreferenced AMQP Large Messages not removed right away,…
Browse files Browse the repository at this point in the history
… requiring a reboot
  • Loading branch information
clebertsuconic committed Mar 15, 2023
1 parent 68c5bed commit 2c03738
Show file tree
Hide file tree
Showing 13 changed files with 1,803 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,10 @@ final class MutableLong {
storedLargeMessages.remove(message.getMessageID());
}

if (message.isLargeMessage()) {
largeMessages.add((LargeServerMessage) message);
}

messages.put(record.id, message);

break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
Expand Down Expand Up @@ -994,6 +995,93 @@ public void testMessageWithAmqpSequencePreservesBodyType() throws Exception {
}
}


@Test
public void testDeleteUnreferencedMessage() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));

AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = createAmqpMessage((byte)'A', payload);
message.setDurable(true);
sender.send(message);
sender.close();
} finally {
connection.close();
}

final org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(getTestName());
queue.forEach(ref -> {
if (ref.getMessage().isLargeMessage()) {
try {
// simulating an ACK but the server crashed before the delete of the record, and the large message file
server.getStorageManager().storeAcknowledge(queue.getID(), ref.getMessageID());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
});

server.stop();

AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);

server.start();
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ221019"));

validateNoFilesOnLargeDir();
runAfter(server::stop);
}

@Test
public void testSimpleLargeMessageRestart() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));

AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = createAmqpMessage((byte)'A', payload);
message.setDurable(true);
sender.send(message);
sender.close();
} finally {
connection.close();
}

server.stop();

AssertionLoggerHandler.startCapture();
server.start();

// These two should not happen as the consumer will receive them
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ221019")); // unferenced record
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ221018")); // unferenced large message

connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
message.accept();
receiver.close();
session.close();
} finally {
connection.close();
}

validateNoFilesOnLargeDir();
runAfter(server::stop);
}


private void sendObjectMessages(int nMsgs, ConnectionFactory factory) throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

/**
Expand Down Expand Up @@ -58,6 +59,13 @@ protected ServerLocator createFactory(final boolean isNetty) throws Exception {
return super.createFactory(isNetty).setMinLargeMessageSize(10240).setCompressLargeMessage(true);
}

@Override
@Test
public void testDeleteUnreferencedMessage() {
// this test makes no sense as it needs to delete a large message and its record
Assume.assumeFalse(true);
}

@Test
public void testSimpleSendOnAvoid() throws Exception {
ActiveMQServer server = createServer(true, isNetty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ protected ServerLocator createFactory(final boolean isNetty) throws Exception {
return super.createFactory(isNetty).setCompressLargeMessage(true);
}

@Override
@Test
public void testDeleteUnreferencedMessage() {
// this test makes no sense as it needs to delete a large message and its record
Assume.assumeFalse(true);
}

@Test
public void testLargeMessageCompressionNotCompressedAndBrowsed() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
Expand Down Expand Up @@ -375,6 +376,51 @@ public void beforeMessageRoute(Message message, RoutingContext context, boolean
validateNoFilesOnLargeDir();
}

@Test
public void testDeleteUnreferencedMessage() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);

ActiveMQServer server = createServer(true, isNetty(), storeType);

server.start();

server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));

ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));

ClientSession session = addClientSession(sf.createSession(false, true, false));
ClientProducer producer = session.createProducer(getName());

Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);

producer.send(clientFile);

session.close();

final Queue queue = server.locateQueue(getName());
queue.forEach(ref -> {
// simulating an ACK but the server crashed before the delete of the record, and the large message file
if (ref.getMessage().isLargeMessage()) {
try {
server.getStorageManager().storeAcknowledge(queue.getID(), ref.getMessageID());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
});
server.stop();

AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);

server.start();
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ221019"));

validateNoFilesOnLargeDir();
runAfter(server::stop);

}


@Test
public void testPendingRecord() throws Exception {
Expand Down
60 changes: 60 additions & 0 deletions tests/soak-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,66 @@
</configuration>
</execution>

<!-- Used on ClusteredLargeMessageInterruptTest -->
<execution>
<phase>test-compile</phase>
<id>create-lmbroker1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>artemis</user>
<password>artemis</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>false</noWeb>
<instance>${basedir}/target/lmbroker1</instance>
<configuration>${basedir}/target/classes/servers/lmbroker1</configuration>
<args>
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
<arg>--clustered</arg>
<arg>--staticCluster</arg>
<arg>tcp://localhost:61716</arg>
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
<arg>--queues</arg>
<arg>ClusteredLargeMessageInterruptTest</arg>
<arg>--name</arg>
<arg>lmbroker1</arg>
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-lmbroker2</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>artemis</user>
<password>artemis</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>false</noWeb>
<instance>${basedir}/target/lmbroker2</instance>
<configuration>${basedir}/target/classes/servers/lmbroker2</configuration>
<portOffset>100</portOffset>
<args>
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
<arg>--clustered</arg>
<arg>--staticCluster</arg>
<arg>tcp://localhost:61616</arg>
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
<arg>--queues</arg>
<arg>ClusteredLargeMessageInterruptTest</arg>
<arg>--name</arg>
<arg>lmbroker2</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>

Expand Down

0 comments on commit 2c03738

Please sign in to comment.