diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index 71838744acb..11857c9ee6d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -83,6 +83,8 @@ public class LargeMessageControllerImpl implements LargeMessageController { private long packetLastPosition = -1; + private long bytesTaken = 0; + private OutputStream outStream; // There's no need to wait a synchronization @@ -315,7 +317,9 @@ public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQE @Override public LargeData take() throws InterruptedException { - return largeMessageData.take(); + LargeData largeData = largeMessageData.take(); + bytesTaken += largeData.getChunk().length; + return largeData; } /** @@ -1146,6 +1150,10 @@ private void popPacket() { } private void checkForPacket(final long index) { + if (totalSize == bytesTaken) { + return; + } + if (outStream != null) { throw new IllegalAccessError("Can't read the messageBody after setting outputStream"); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java index 598717346ea..5a7b91099b4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java @@ -304,6 +304,15 @@ private void testFederatedQueueRemoteConsume(final String queueName) throws Exce @Test public void testWithLargeMessage() throws Exception { + internalTestWithLargeMessages(1); + } + + @Test + public void testWithMultipleLargeMessages() throws Exception { + internalTestWithLargeMessages(5); + } + + private void internalTestWithLargeMessages(int messageNumber) throws Exception { String queueName = getName(); FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); @@ -319,14 +328,18 @@ public void testWithLargeMessage() throws Exception { Session session1 = connection1.createSession(); Queue queue1 = session1.createQueue(queueName); MessageProducer producer = session1.createProducer(queue1); - producer.send(session1.createTextMessage(payload)); + for (int i = 0; i < messageNumber; i++) { + producer.send(session1.createTextMessage(payload)); + } connection0.start(); Session session0 = connection0.createSession(); Queue queue0 = session0.createQueue(queueName); MessageConsumer consumer0 = session0.createConsumer(queue0); - assertNotNull(consumer0.receive(60000)); + for (int i = 0; i < messageNumber; i++) { + assertNotNull(consumer0.receive(1000)); + } } } @@ -704,5 +717,4 @@ public org.apache.activemq.artemis.api.core.Message transform(org.apache.activem return message; } } - }