From 588eb75565487ab1a5b0f8fdcb230dea4c8d718a Mon Sep 17 00:00:00 2001 From: AntonRoskvist Date: Tue, 21 Apr 2026 21:08:47 +0200 Subject: [PATCH 1/2] ARTEMIS-6009 Performance improvement when consuming large messages --- .../artemis/core/client/impl/ClientLargeMessageImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java index 411974f1494..659fb86deca 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java @@ -172,6 +172,12 @@ private static class ActiveMQOutputStream extends OutputStream { public void write(int b) throws IOException { bufferOut.writeByte((byte) (b & 0xff)); } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + bufferOut.writeBytes(b, off, len); + } + } @Override From 7c9aefa41fec027c12bc0ea061ad4042ebd8c1de Mon Sep 17 00:00:00 2001 From: AntonRoskvist Date: Tue, 21 Apr 2026 21:55:22 +0200 Subject: [PATCH 2/2] ARTEMIS-6009 Added soak test --- .../soak/client/LargeMessageSoakTest.java | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/LargeMessageSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/LargeMessageSoakTest.java index 652ad41ddff..e4a58249f41 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/LargeMessageSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/LargeMessageSoakTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.utils.RandomUtil; @@ -149,5 +150,72 @@ public void testSendReceive(String protocol) throws Exception { assertEquals(0, errors.get()); } + @Test + public void testReceiveLargeMessagesPerformance() throws Exception { + final int THREADS = 5; + final int MESSAGE_COUNT = 10000; + final int MESSAGE_SIZE = 1024 * 1024; + + ExecutorService executorService = Executors.newFixedThreadPool(THREADS); + runAfter(executorService::shutdownNow); + + ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + + try (Connection connection = factory.createConnection(); + Session session = connection.createSession(Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue("TEST"))) { + + TextMessage textMessage = session.createTextMessage(RandomUtil.randomAlphaNumericString(MESSAGE_SIZE)); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + + producer.send(textMessage); + + if (i % 100 == 0) { + session.commit(); + } + + } + + session.commit(); + } + + CountDownLatch done = new CountDownLatch(THREADS); + long start = System.currentTimeMillis(); + + for (int t = 0; t < THREADS; t++) { + executorService.execute(() -> { + + try (Connection connection = factory.createConnection(); + Session session = connection.createSession(Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(session.createQueue("TEST"))) { + + connection.start(); + + int count = 0; + while (consumer.receive(100) != null) { + if (++count >= 100) { + session.commit(); + count = 0; + } + } + + session.commit(); + + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } finally { + done.countDown(); + } + + }); + } + + assertTrue(done.await(5, TimeUnit.MINUTES)); + assertEquals(0, server.locateQueue("TEST").getMessageCount()); + + logger.info("All messages received in {} ms", System.currentTimeMillis() - start); + + } }