Skip to content

Commit

Permalink
ARTEMIS-2554 - Queue control browse broken with large messages
Browse files Browse the repository at this point in the history
  • Loading branch information
andytaylor committed Nov 18, 2019
1 parent 7168cc1 commit 0ab75b9
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,14 @@ public Map<String, Object> getFields(MessageReference ref) throws OpenDataExcept
Map<String, Object> rc = super.getFields(ref);
ICoreMessage m = ref.getMessage().toCore();
if (!m.isLargeMessage()) {
SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : "");
if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
} else {
SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : "");
}
} else {
rc.put(CompositeDataConstants.TEXT_BODY, "");
rc.put(CompositeDataConstants.TEXT_BODY, "[large message]");
}
return rc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Test;

import javax.management.openmbean.CompositeData;

/**
* A LargeMessageCompressTest
* <br>
Expand All @@ -61,6 +64,66 @@ protected ServerLocator createFactory(final boolean isNetty) throws Exception {
return super.createFactory(isNetty).setCompressLargeMessage(true);
}

@Test
public void testLargeMessageCompressionNotCompressedAndBrowsed() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);

ActiveMQServer server = createServer(true, isNetty());

server.start();

ClientSessionFactory sf = createSessionFactory(locator);

ClientSession session = addClientSession(sf.createSession(false, false, false));

session.createTemporaryQueue(ADDRESS, ADDRESS);

ClientProducer producer = session.createProducer(ADDRESS);

Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);

clientFile.setType(Message.TEXT_TYPE);

producer.send(clientFile);

session.commit();

session.close();

QueueControlImpl queueControl = (QueueControlImpl) server.getManagementService().getResource("queue.SimpleAddress");

CompositeData[] browse = queueControl.browse();

Assert.assertNotNull(browse);

Assert.assertEquals(browse.length, 1);

Assert.assertEquals(browse[0].get("text"), "[compressed]");

//clean up
session = addClientSession(sf.createSession(false, false, false));

session.start();

ClientConsumer consumer = session.createConsumer(ADDRESS);
ClientMessage msg1 = consumer.receive(1000);
Assert.assertNotNull(msg1);

for (int i = 0; i < messageSize; i++) {
byte b = msg1.getBodyBuffer().readByte();
assertEquals("position = " + i, getSamplebyte(i), b);
}

msg1.acknowledge();
session.commit();

consumer.close();

session.close();

validateNoFilesOnLargeDir();
}

@Test
public void testLargeMessageCompression() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
Expand Down

0 comments on commit 0ab75b9

Please sign in to comment.