Skip to content
Permalink
Browse files
ARTEMIS-3721 AMQP Mirrored Large Message file not removed
  • Loading branch information
clebertsuconic committed Mar 15, 2022
1 parent 1ed7cc1 commit 861fe59124732420dc9a66f292aa4ada968e103a
Showing 2 changed files with 92 additions and 2 deletions.
@@ -218,8 +218,6 @@ public void sendMessage(Message message, RoutingContext context, List<MessageRef
}
snfQueue.refUp(ref);
refs.add(ref);
message.usageUp();


if (message.isDurable() && snfQueue.isDurable()) {
PostOfficeImpl.storeDurableReference(server.getStorageManager(), message, context.getTransaction(), snfQueue, true);
@@ -60,6 +60,20 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
private static final Logger logger = Logger.getLogger(BrokerInSyncTest.class);
ActiveMQServer server_2;

@After
public void stopServer1() throws Exception {
if (server != null) {
server.stop();
}
}

@After
public void stopServer2() throws Exception {
if (server_2 != null) {
server_2.stop();
}
}

@Before
public void startLogging() {
AssertionLoggerHandler.startCapture();
@@ -422,4 +436,82 @@ public void testSyncDataNoSuppliedID() throws Exception {
server.stop();
}

@Test
public void testLargeMessageInSync() throws Exception {
String queueName = "testSyncLargeMessage";
server.setIdentity("Server1");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
server.start();

server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("Server2");

{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}

server_2.start();

server_2.addAddressInfo(new AddressInfo(queueName).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
server_2.createQueue(new QueueConfiguration(queueName).setDurable(true).setRoutingType(RoutingType.ANYCAST));

Wait.assertTrue(() -> server_2.locateQueue(queueName) != null);
Wait.assertTrue(() -> server.locateQueue(queueName) != null);

String bigString;
{
StringBuffer bigStringBuffer = new StringBuffer();
while (bigStringBuffer.length() < 200 * 1024) {
bigStringBuffer.append("This is a big string ");
}
bigString = bigStringBuffer.toString();
}

ConnectionFactory factory1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
ConnectionFactory factory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);

try (Connection connection = factory1.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage(bigString));
}

try (Connection connection = factory2.createConnection()) {
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
Wait.assertEquals(1, serverQueue::getMessageCount);
Wait.assertEquals(1, () -> getNumberOfFiles(server_2.getConfiguration().getLargeMessagesLocation()), 5000, 100);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
TextMessage message = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(bigString, message.getText());
Wait.assertEquals(0, () -> getNumberOfFiles(server_2.getConfiguration().getLargeMessagesLocation()), 5000, 100);
}

try (Connection connection = factory1.createConnection()) {
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);

Wait.assertEquals(0, serverQueue::getMessageCount);

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
TextMessage message = (TextMessage)consumer.receiveNoWait();
Assert.assertNull(message);
server.stop();
server_2.stop();
Wait.assertEquals(0, () -> getNumberOfFiles(server.getConfiguration().getLargeMessagesLocation()), 1000, 100);
}
}

}

0 comments on commit 861fe59

Please sign in to comment.