Skip to content

Commit

Permalink
ARTEMIS-4282 Large Header may break the broker
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed May 17, 2023
1 parent ec54576 commit 03afbed
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,13 @@ public void appendAddRecordTransactional(final long txID,
txID, id, recordType, record);
}

JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
int encodeSize = addRecord.getEncodeSize();

if (encodeSize > getMaxRecordSize()) {
//The record size should be larger than max record size only on the large messages case.
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(encodeSize, getMaxRecordSize());
}

appendExecutor.execute(new Runnable() {

Expand All @@ -1276,9 +1283,7 @@ public void run() {
if (tx != null) {
tx.checkErrorCondition();
}
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
// we need to calculate the encodeSize here, as it may use caches that are eliminated once the record is written
int encodeSize = addRecord.getEncodeSize();
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);

if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
Expand Down Expand Up @@ -1914,7 +1915,17 @@ public synchronized RoutingStatus send(Transaction tx,

result = handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, senderName, noAutoCreateQueue, routingContext);
try {
result = doSend(tx, message, address, direct, senderName, noAutoCreateQueue, routingContext);
} catch (ActiveMQIOErrorException e) {
if (tx != null) {
tx.markAsRollbackOnly(e);
}
if (message.isLargeMessage()) {
((LargeServerMessage)message).deleteFile();
}
throw e;
}
}

if (AuditLogger.isMessageLoggingEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand All @@ -41,6 +42,7 @@
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
Expand All @@ -49,6 +51,7 @@
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.transport.amqp.client.AmqpClient;
Expand Down Expand Up @@ -521,6 +524,81 @@ public void doTestSendHugeHeader(int expectedSize) throws Exception {
}
}

@Test
public void testLargeHeaderTXLargeBody() throws Exception {
Assume.assumeFalse(jdbc); // the checked rule with the property size will not be applied to JDBC, hence we skip the test
testLargeHeaderTX(true);
}

@Test
public void testLargeHeaderTXSmallBody() throws Exception {
Assume.assumeFalse(jdbc); // the checked rule with the property size will not be applied to JDBC, hence we skip the test
testLargeHeaderTX(false);
}

private void testLargeHeaderTX(boolean largeBody) throws Exception {
String testQueueName = RandomUtil.randomString();
server.createQueue(new QueueConfiguration(testQueueName).setRoutingType(RoutingType.ANYCAST));
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");

String largeString;
{
StringBuffer buffer = new StringBuffer();
while (buffer.length() < 1024 * 1024) {
buffer.append("This is a large string ");
}
largeString = buffer.toString();
}

String smallString = "small string";

String body = largeBody ? largeString : smallString;

try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(testQueueName));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message message = session.createTextMessage(body);
message.setStringProperty("test", largeString);
boolean failed = false;
try {
producer.send(message);
session.commit();
} catch (Exception expected) {
failed = true;
}
Assert.assertTrue(failed);
}

try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(testQueueName));

Message message = session.createTextMessage(body);
message.setStringProperty("test", smallString);
producer.send(message);
session.commit();

connection.start();

MessageConsumer consumer = session.createConsumer(session.createQueue(testQueueName));
TextMessage recMessage = (TextMessage) consumer.receive(5000);
Assert.assertEquals(smallString, recMessage.getStringProperty("test"));
Assert.assertEquals(body, recMessage.getText());
session.commit();

Assert.assertNull(consumer.receiveNoWait());
}

org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(testQueueName);
Wait.assertEquals(0, serverQueue::getMessageCount);

File largeMessageFolder = server.getConfiguration().getLargeMessagesLocation();
File[] files = largeMessageFolder.listFiles();
Assert.assertTrue(files == null ? "Null Files" : "There are " + files.length + " files in the large message folder", files == null || files.length == 0);
}


@Test(timeout = 60000)
public void testSendSmallerMessages() throws Exception {
for (int i = 512; i <= (8 * 1024); i += 512) {
Expand Down

0 comments on commit 03afbed

Please sign in to comment.