Skip to content

Commit

Permalink
ARTEMIS-4172 sending large msg via core skips plugins & audit log
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Feb 16, 2023
1 parent 82fc429 commit fb169bc
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ private void sendContinuations(final int packetSize,
currentLargeMessage.setStorageManager(storageManager);
currentLargeMessage = null;
try {
session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), null, false, producers.get(senderID), false);
session.send(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), false, producers.get(senderID), false);
} catch (Exception e) {
message.deleteFile();
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -125,9 +126,17 @@ public void setUp() throws Exception {
queue = createQueue("queue1");
}


@Test
public void testSendReceive() throws Exception {
internalTestSendReceive(64);
}

@Test
public void testSendReceiveLarge() throws Exception {
internalTestSendReceive(1024 * 1024);
}

private void internalTestSendReceive(int messageSize) throws Exception {
final AckPluginVerifier ackVerifier = new AckPluginVerifier((consumer, reason) -> {
assertEquals(AckReason.NORMAL, reason);
assertNotNull(consumer);
Expand All @@ -142,7 +151,12 @@ public void testSendReceive() throws Exception {
MessageProducer prod = sess.createProducer(queue);
MessageConsumer cons = sess.createConsumer(queue);

TextMessage msg1 = sess.createTextMessage("test");
byte[] msgs = new byte[messageSize];
for (int i = 0; i < msgs.length; i++) {
msgs[i] = RandomUtil.randomByte();
}

TextMessage msg1 = sess.createTextMessage(new String(msgs));
prod.send(msg1);
TextMessage received1 = (TextMessage)cons.receive(1000);
assertNotNull(received1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,25 @@ public void testAuditLog() throws Exception {

@Test
public void testAuditHotLogCore() throws Exception {
internalSend("CORE");
internalSend("CORE", 64);
}

@Test
public void testAuditHotLogAMQP() throws Exception {
internalSend("AMQP");
internalSend("AMQP", 64);
}

public void internalSend(String protocol) throws Exception {
@Test
public void testAuditHotLogCoreLarge() throws Exception {
internalSend("CORE", 1024 * 1024);
}

@Test
public void testAuditHotLogAMQPLarge() throws Exception {
internalSend("AMQP", 1024 * 1024);
}

public void internalSend(String protocol, int messageSize) throws Exception {
JMXConnector jmxConnector = getJmxConnector();
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
String brokerName = "0.0.0.0"; // configured e.g. in broker.xml <broker-name> element
Expand All @@ -149,7 +159,13 @@ public void internalSend(String protocol) throws Exception {
try {
Session session = connection.createSession();
MessageProducer producer = session.createProducer(session.createQueue(address.toString()));
TextMessage message = session.createTextMessage("msg1");

byte[] msgs = new byte[messageSize];
for (int i = 0; i < msgs.length; i++) {
msgs[i] = RandomUtil.randomByte();
}

TextMessage message = session.createTextMessage(new String(msgs));
message.setStringProperty("str", uniqueStr);
producer.send(message);

Expand Down

0 comments on commit fb169bc

Please sign in to comment.