diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 64ff31fb288..688c637fc63 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -45,6 +45,7 @@
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.codec.CompositeReadableBuffer;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.TypeConstructor;
@@ -102,6 +103,9 @@ public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
private StorageManager storageManager;
+ /** this is used to parse the initial packets from the buffer */
+ CompositeReadableBuffer parsingBuffer;
+
public AMQPLargeMessage(long id,
long messageFormat,
TypedProperties extraProperties,
@@ -272,19 +276,6 @@ protected void readSavedEncoding(ByteBuf buf) {
}
}
-
- @Override
- public void finishParse() throws Exception {
- openLargeMessage();
- try {
- this.ensureMessageDataScanned();
- parsingData.rewind();
- lazyDecodeApplicationProperties();
- } finally {
- closeLargeMessage();
- }
- }
-
@Override
public void validateFile() throws ActiveMQException {
largeBody.validateFile();
@@ -344,12 +335,7 @@ public void parseHeader(ReadableBuffer buffer) {
}
public void addBytes(ReadableBuffer data) throws Exception {
-
- // We need to parse the header on the first add,
- // as it will contain information if the message is durable or not
- if (header == null && largeBody.getStoredBodySize() <= 0) {
- parseHeader(data);
- }
+ parseLargeMessage(data);
if (data.hasArray() && data.remaining() == data.array().length) {
//System.out.println("Received " + data.array().length + "::" + ByteUtil.formatGroup(ByteUtil.bytesToHex(data.array()), 8, 16));
@@ -362,6 +348,63 @@ public void addBytes(ReadableBuffer data) throws Exception {
}
}
+ protected void parseLargeMessage(ActiveMQBuffer data, boolean initialHeader) {
+ MessageDataScanningStatus status = getDataScanningStatus();
+ if (status == MessageDataScanningStatus.NOT_SCANNED) {
+ ByteBuf buffer = data.byteBuf().duplicate();
+ if (parsingBuffer == null) {
+ parsingBuffer = new CompositeReadableBuffer();
+ }
+ byte[] parsingData = new byte[buffer.readableBytes()];
+ buffer.readBytes(parsingData);
+
+ parsingBuffer.append(parsingData);
+ if (!initialHeader) {
+ genericParseLargeMessage();
+ }
+ }
+ }
+
+ protected void parseLargeMessage(byte[] data, boolean initialHeader) {
+ MessageDataScanningStatus status = getDataScanningStatus();
+ if (status == MessageDataScanningStatus.NOT_SCANNED) {
+ byte[] copy = new byte[data.length];
+ System.arraycopy(data, 0, copy, 0, data.length);
+ if (parsingBuffer == null) {
+ parsingBuffer = new CompositeReadableBuffer();
+ }
+
+ parsingBuffer.append(copy);
+ if (!initialHeader) {
+ genericParseLargeMessage();
+ }
+ }
+ }
+
+ private void genericParseLargeMessage() {
+ try {
+ parsingBuffer.position(0);
+ scanMessageData(parsingBuffer);
+ lazyDecodeApplicationProperties(parsingBuffer);
+ parsingBuffer = null;
+ } catch (RuntimeException expected) {
+ // this would mean the buffer is not complete yet, so we keep parsing it, until we can get enough bytes
+ logger.debug("The buffer for AMQP Large Mesasge was probably not complete, so an exception eventually would be expected", expected);
+ }
+ }
+
+ protected void parseLargeMessage(ReadableBuffer data) {
+ MessageDataScanningStatus status = getDataScanningStatus();
+ if (status == MessageDataScanningStatus.NOT_SCANNED) {
+ if (parsingBuffer == null) {
+ parsingBuffer = new CompositeReadableBuffer();
+ }
+
+ parsingBuffer.append(data.duplicate());
+ genericParseLargeMessage();
+ }
+ }
+
@Override
public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference reference) {
return getData().rewind();
@@ -374,11 +417,13 @@ public Message toMessage() {
@Override
public void addBytes(byte[] bytes) throws Exception {
+ parseLargeMessage(bytes, false);
largeBody.addBytes(bytes);
}
@Override
- public void addBytes(ActiveMQBuffer bytes) throws Exception {
+ public void addBytes(ActiveMQBuffer bytes, boolean initialHeader) throws Exception {
+ parseLargeMessage(bytes, initialHeader);
largeBody.addBytes(bytes);
}
@@ -471,7 +516,6 @@ public Message copy(final long newID, boolean isDLQOrExpiry) {
}
largeBody.copyInto(copy, bufferNewHeader, place.intValue());
- copy.finishParse();
copy.releaseResources(true);
return copy;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 06b6dd13bef..0bcf57257ff 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -235,12 +235,7 @@ protected AMQPMessage(long messageFormat) {
this.coreMessageObjectPools = null;
}
- /**
- * Similarly to {@link MessageDataScanningStatus}, this method is made available only for testing
- * purposes to check the message data scanning status.
- * Its access is not thread-safe and it shouldn't return {@code null}.
- */
- public final MessageDataScanningStatus messageDataScanned() {
+ public final MessageDataScanningStatus getDataScanningStatus() {
return MessageDataScanningStatus.valueOf(messageDataScanned);
}
@@ -432,8 +427,12 @@ public final Footer getFooter() {
return scanForMessageSection(Math.max(0, remainingBodyPosition), Footer.class);
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
protected T scanForMessageSection(int scanStartPosition, Class...targetTypes) {
+ return scanForMessageSection(getData().duplicate().position(0), scanStartPosition, targetTypes);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected T scanForMessageSection(ReadableBuffer buffer, int scanStartPosition, Class...targetTypes) {
ensureMessageDataScanned();
// In cases where we parsed out enough to know the value is not encoded in the message
@@ -442,7 +441,6 @@ protected T scanForMessageSection(int scanStartPosition, Class...targetTypes
return null;
}
- ReadableBuffer buffer = getData().duplicate().position(0);
final DecoderImpl decoder = TLSEncode.getDecoder();
buffer.position(scanStartPosition);
@@ -470,8 +468,15 @@ protected T scanForMessageSection(int scanStartPosition, Class...targetTypes
}
protected ApplicationProperties lazyDecodeApplicationProperties() {
+ if (applicationProperties != null || applicationPropertiesPosition == VALUE_NOT_PRESENT) {
+ return applicationProperties;
+ }
+ return lazyDecodeApplicationProperties(getData().duplicate().position(0));
+ }
+
+ protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
- applicationProperties = scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
+ applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
}
return applicationProperties;
@@ -546,7 +551,7 @@ protected void setMessageAnnotation(Symbol annotation, Object value) {
// re-encode should be done to update the backing data with the in memory elements.
protected synchronized void ensureMessageDataScanned() {
- final MessageDataScanningStatus state = MessageDataScanningStatus.valueOf(messageDataScanned);
+ final MessageDataScanningStatus state = getDataScanningStatus();
switch (state) {
case NOT_SCANNED:
scanMessageData();
@@ -583,14 +588,15 @@ protected synchronized void resetMessageData() {
}
protected synchronized void scanMessageData() {
- this.messageDataScanned = MessageDataScanningStatus.SCANNED.code;
+ scanMessageData(getData());
+ }
+
+ protected synchronized void scanMessageData(ReadableBuffer data) {
DecoderImpl decoder = TLSEncode.getDecoder();
- decoder.setBuffer(getData().rewind());
+ decoder.setBuffer(data);
resetMessageData();
- ReadableBuffer data = getData();
-
try {
while (data.hasRemaining()) {
int constructorPos = data.position();
@@ -634,6 +640,7 @@ protected synchronized void scanMessageData() {
decoder.setByteBuffer(null);
data.rewind();
}
+ this.messageDataScanned = MessageDataScanningStatus.SCANNED.code;
}
@Override
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
index 797a3c03ea6..31b62f2269a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
@@ -250,7 +250,6 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
if (currentLargeMessage != null) {
currentLargeMessage.addBytes(receiver.recv());
receiver.advance();
- currentLargeMessage.finishParse();
message = currentLargeMessage;
currentLargeMessage = null;
} else {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
index 0f7f76d4441..57e7d9be46f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -172,20 +172,20 @@ public void testHasScheduledDeliveryTimeReloadPersistence() {
} catch (NullPointerException npe) {
}
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());
// Now reload from encoded data
message.reloadPersistence(encoded, null);
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
message.getHeader();
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
}
@@ -205,20 +205,20 @@ public void testHasScheduledDeliveryDelayReloadPersistence() {
} catch (NullPointerException npe) {
}
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());
// Now reload from encoded data
message.reloadPersistence(encoded, null);
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
message.getHeader();
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
}
@@ -235,20 +235,20 @@ public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() {
} catch (NullPointerException npe) {
}
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());
// Now reload from encoded data
message.reloadPersistence(encoded, null);
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertFalse(message.hasScheduledDeliveryTime());
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
message.getHeader();
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
assertFalse(message.hasScheduledDeliveryTime());
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
index 0d498191158..9e06552ea65 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
@@ -336,7 +336,7 @@ public void copyInto(LargeServerMessage newMessage, ByteBuf newHeader, int skipB
cloneFile.position(skipBytes);
if (newHeader != null) {
- newMessage.addBytes(new ChannelBufferWrapper(newHeader));
+ newMessage.addBytes(new ChannelBufferWrapper(newHeader), true);
}
for (; ; ) {
// The buffer is reused...
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 3fadb167872..2ee61148f6d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -57,11 +57,6 @@ public static Message checkLargeMessage(Message message, StorageManager storageM
}
}
- @Override
- public void finishParse() throws Exception {
-
- }
-
private static Message asLargeMessage(Message message, StorageManager storageManager) throws Exception {
ICoreMessage coreMessage = message.toCore();
LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
@@ -176,7 +171,7 @@ public void addBytes(final byte[] bytes) throws Exception {
}
@Override
- public void addBytes(final ActiveMQBuffer bytes) throws Exception {
+ public void addBytes(final ActiveMQBuffer bytes, boolean initialHeader) throws Exception {
synchronized (largeBody) {
largeBody.addBytes(bytes);
}
@@ -298,7 +293,6 @@ public Message copy(final long newID) {
try {
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
largeBody.copyInto(newMessage);
- newMessage.finishParse();
newMessage.releaseResources(true);
return newMessage.toMessage();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
index f7684a82b9c..69ea514b8ae 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
@@ -66,11 +66,6 @@ public synchronized void addBytes(final byte[] bytes) {
buffer.writeBytes(bytes);
}
- @Override
- public void finishParse() throws Exception {
-
- }
-
@Override
public void validateFile() throws ActiveMQException {
@@ -82,7 +77,7 @@ public void setStorageManager(StorageManager storageManager) {
}
@Override
- public synchronized void addBytes(ActiveMQBuffer bytes) {
+ public synchronized void addBytes(ActiveMQBuffer bytes, boolean initialHeader) {
final int readableBytes = bytes.readableBytes();
if (buffer == null) {
buffer = Unpooled.buffer(readableBytes);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index d9eb996f0d2..6375f3b2c17 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -34,7 +34,11 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
@Override
void addBytes(byte[] bytes) throws Exception;
- void addBytes(ActiveMQBuffer bytes) throws Exception;
+ default void addBytes(ActiveMQBuffer bytes) throws Exception {
+ addBytes(bytes, false);
+ }
+
+ void addBytes(ActiveMQBuffer bytes, boolean initialHeader) throws Exception;
long getMessageID();
@@ -67,6 +71,4 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
void setStorageManager(StorageManager storageManager);
void validateFile() throws ActiveMQException;
-
- void finishParse() throws Exception;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java
index 4f5315c0ab1..f6e816628d3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java
@@ -59,7 +59,7 @@ public void durableMessageDataNotScannedOnRestartTest() throws Exception {
final Message message = next.getMessage();
Assert.assertThat(message, Matchers.instanceOf(AMQPMessage.class));
amqpMessage = (AMQPMessage) message;
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, amqpMessage.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, amqpMessage.getDataScanningStatus());
}
AmqpClient client = createAmqpClient();
@@ -75,7 +75,7 @@ public void durableMessageDataNotScannedOnRestartTest() throws Exception {
assertEquals(1, afterRestartQueueView.getMessageCount());
- Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, amqpMessage.messageDataScanned());
+ Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, amqpMessage.getDataScanningStatus());
receive.accept();