From 6a6a3b3863d2defb30bdaa021d149d8e0ec53eec Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 7 Dec 2020 14:53:59 -0500 Subject: [PATCH] ARTEMIS-3023 Avoid opening AMQP Large Message for final parsing --- .../amqp/broker/AMQPLargeMessage.java | 86 ++++++++++++++----- .../protocol/amqp/broker/AMQPMessage.java | 35 +++++--- .../amqp/proton/ProtonAbstractReceiver.java | 1 - .../protocol/amqp/broker/AMQPMessageTest.java | 24 +++--- .../persistence/impl/journal/LargeBody.java | 2 +- .../impl/journal/LargeServerMessageImpl.java | 8 +- .../nullpm/NullStorageLargeServerMessage.java | 7 +- .../core/server/LargeServerMessage.java | 8 +- .../amqp/journal/AmqpJournalLoadingTest.java | 4 +- 9 files changed, 108 insertions(+), 67 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 64ff31fb288..688c637fc63 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -45,6 +45,7 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.codec.CompositeReadableBuffer; import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.TypeConstructor; @@ -102,6 +103,9 @@ public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { private StorageManager storageManager; + /** this is used to parse the initial packets from the buffer */ + CompositeReadableBuffer parsingBuffer; + public AMQPLargeMessage(long id, long messageFormat, TypedProperties extraProperties, @@ -272,19 +276,6 @@ protected void readSavedEncoding(ByteBuf buf) { } } - - @Override - public void finishParse() throws Exception { - openLargeMessage(); - try { - this.ensureMessageDataScanned(); - parsingData.rewind(); - lazyDecodeApplicationProperties(); - } finally { - closeLargeMessage(); - } - } - @Override public void validateFile() throws ActiveMQException { largeBody.validateFile(); @@ -344,12 +335,7 @@ public void parseHeader(ReadableBuffer buffer) { } public void addBytes(ReadableBuffer data) throws Exception { - - // We need to parse the header on the first add, - // as it will contain information if the message is durable or not - if (header == null && largeBody.getStoredBodySize() <= 0) { - parseHeader(data); - } + parseLargeMessage(data); if (data.hasArray() && data.remaining() == data.array().length) { //System.out.println("Received " + data.array().length + "::" + ByteUtil.formatGroup(ByteUtil.bytesToHex(data.array()), 8, 16)); @@ -362,6 +348,63 @@ public void addBytes(ReadableBuffer data) throws Exception { } } + protected void parseLargeMessage(ActiveMQBuffer data, boolean initialHeader) { + MessageDataScanningStatus status = getDataScanningStatus(); + if (status == MessageDataScanningStatus.NOT_SCANNED) { + ByteBuf buffer = data.byteBuf().duplicate(); + if (parsingBuffer == null) { + parsingBuffer = new CompositeReadableBuffer(); + } + byte[] parsingData = new byte[buffer.readableBytes()]; + buffer.readBytes(parsingData); + + parsingBuffer.append(parsingData); + if (!initialHeader) { + genericParseLargeMessage(); + } + } + } + + protected void parseLargeMessage(byte[] data, boolean initialHeader) { + MessageDataScanningStatus status = getDataScanningStatus(); + if (status == MessageDataScanningStatus.NOT_SCANNED) { + byte[] copy = new byte[data.length]; + System.arraycopy(data, 0, copy, 0, data.length); + if (parsingBuffer == null) { + parsingBuffer = new CompositeReadableBuffer(); + } + + parsingBuffer.append(copy); + if (!initialHeader) { + genericParseLargeMessage(); + } + } + } + + private void genericParseLargeMessage() { + try { + parsingBuffer.position(0); + scanMessageData(parsingBuffer); + lazyDecodeApplicationProperties(parsingBuffer); + parsingBuffer = null; + } catch (RuntimeException expected) { + // this would mean the buffer is not complete yet, so we keep parsing it, until we can get enough bytes + logger.debug("The buffer for AMQP Large Mesasge was probably not complete, so an exception eventually would be expected", expected); + } + } + + protected void parseLargeMessage(ReadableBuffer data) { + MessageDataScanningStatus status = getDataScanningStatus(); + if (status == MessageDataScanningStatus.NOT_SCANNED) { + if (parsingBuffer == null) { + parsingBuffer = new CompositeReadableBuffer(); + } + + parsingBuffer.append(data.duplicate()); + genericParseLargeMessage(); + } + } + @Override public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference reference) { return getData().rewind(); @@ -374,11 +417,13 @@ public Message toMessage() { @Override public void addBytes(byte[] bytes) throws Exception { + parseLargeMessage(bytes, false); largeBody.addBytes(bytes); } @Override - public void addBytes(ActiveMQBuffer bytes) throws Exception { + public void addBytes(ActiveMQBuffer bytes, boolean initialHeader) throws Exception { + parseLargeMessage(bytes, initialHeader); largeBody.addBytes(bytes); } @@ -471,7 +516,6 @@ public Message copy(final long newID, boolean isDLQOrExpiry) { } largeBody.copyInto(copy, bufferNewHeader, place.intValue()); - copy.finishParse(); copy.releaseResources(true); return copy; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 06b6dd13bef..0bcf57257ff 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -235,12 +235,7 @@ protected AMQPMessage(long messageFormat) { this.coreMessageObjectPools = null; } - /** - * Similarly to {@link MessageDataScanningStatus}, this method is made available only for testing - * purposes to check the message data scanning status.
- * Its access is not thread-safe and it shouldn't return {@code null}. - */ - public final MessageDataScanningStatus messageDataScanned() { + public final MessageDataScanningStatus getDataScanningStatus() { return MessageDataScanningStatus.valueOf(messageDataScanned); } @@ -432,8 +427,12 @@ public final Footer getFooter() { return scanForMessageSection(Math.max(0, remainingBodyPosition), Footer.class); } - @SuppressWarnings({ "unchecked", "rawtypes" }) protected T scanForMessageSection(int scanStartPosition, Class...targetTypes) { + return scanForMessageSection(getData().duplicate().position(0), scanStartPosition, targetTypes); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + protected T scanForMessageSection(ReadableBuffer buffer, int scanStartPosition, Class...targetTypes) { ensureMessageDataScanned(); // In cases where we parsed out enough to know the value is not encoded in the message @@ -442,7 +441,6 @@ protected T scanForMessageSection(int scanStartPosition, Class...targetTypes return null; } - ReadableBuffer buffer = getData().duplicate().position(0); final DecoderImpl decoder = TLSEncode.getDecoder(); buffer.position(scanStartPosition); @@ -470,8 +468,15 @@ protected T scanForMessageSection(int scanStartPosition, Class...targetTypes } protected ApplicationProperties lazyDecodeApplicationProperties() { + if (applicationProperties != null || applicationPropertiesPosition == VALUE_NOT_PRESENT) { + return applicationProperties; + } + return lazyDecodeApplicationProperties(getData().duplicate().position(0)); + } + + protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) { if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) { - applicationProperties = scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class); + applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class); } return applicationProperties; @@ -546,7 +551,7 @@ protected void setMessageAnnotation(Symbol annotation, Object value) { // re-encode should be done to update the backing data with the in memory elements. protected synchronized void ensureMessageDataScanned() { - final MessageDataScanningStatus state = MessageDataScanningStatus.valueOf(messageDataScanned); + final MessageDataScanningStatus state = getDataScanningStatus(); switch (state) { case NOT_SCANNED: scanMessageData(); @@ -583,14 +588,15 @@ protected synchronized void resetMessageData() { } protected synchronized void scanMessageData() { - this.messageDataScanned = MessageDataScanningStatus.SCANNED.code; + scanMessageData(getData()); + } + + protected synchronized void scanMessageData(ReadableBuffer data) { DecoderImpl decoder = TLSEncode.getDecoder(); - decoder.setBuffer(getData().rewind()); + decoder.setBuffer(data); resetMessageData(); - ReadableBuffer data = getData(); - try { while (data.hasRemaining()) { int constructorPos = data.position(); @@ -634,6 +640,7 @@ protected synchronized void scanMessageData() { decoder.setByteBuffer(null); data.rewind(); } + this.messageDataScanned = MessageDataScanningStatus.SCANNED.code; } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java index 797a3c03ea6..31b62f2269a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java @@ -250,7 +250,6 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException { if (currentLargeMessage != null) { currentLargeMessage.addBytes(receiver.recv()); receiver.advance(); - currentLargeMessage.finishParse(); message = currentLargeMessage; currentLargeMessage = null; } else { diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index 0f7f76d4441..57e7d9be46f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -172,20 +172,20 @@ public void testHasScheduledDeliveryTimeReloadPersistence() { } catch (NullPointerException npe) { } - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus()); // Now reload from encoded data message.reloadPersistence(encoded, null); - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); assertTrue(message.hasScheduledDeliveryTime()); - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); message.getHeader(); - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus()); assertTrue(message.hasScheduledDeliveryTime()); } @@ -205,20 +205,20 @@ public void testHasScheduledDeliveryDelayReloadPersistence() { } catch (NullPointerException npe) { } - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus()); // Now reload from encoded data message.reloadPersistence(encoded, null); - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); assertTrue(message.hasScheduledDeliveryTime()); - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); message.getHeader(); - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus()); assertTrue(message.hasScheduledDeliveryTime()); } @@ -235,20 +235,20 @@ public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() { } catch (NullPointerException npe) { } - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus()); // Now reload from encoded data message.reloadPersistence(encoded, null); - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); assertFalse(message.hasScheduledDeliveryTime()); - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); message.getHeader(); - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus()); assertFalse(message.hasScheduledDeliveryTime()); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java index 0d498191158..9e06552ea65 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java @@ -336,7 +336,7 @@ public void copyInto(LargeServerMessage newMessage, ByteBuf newHeader, int skipB cloneFile.position(skipBytes); if (newHeader != null) { - newMessage.addBytes(new ChannelBufferWrapper(newHeader)); + newMessage.addBytes(new ChannelBufferWrapper(newHeader), true); } for (; ; ) { // The buffer is reused... diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 3fadb167872..2ee61148f6d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -57,11 +57,6 @@ public static Message checkLargeMessage(Message message, StorageManager storageM } } - @Override - public void finishParse() throws Exception { - - } - private static Message asLargeMessage(Message message, StorageManager storageManager) throws Exception { ICoreMessage coreMessage = message.toCore(); LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), coreMessage); @@ -176,7 +171,7 @@ public void addBytes(final byte[] bytes) throws Exception { } @Override - public void addBytes(final ActiveMQBuffer bytes) throws Exception { + public void addBytes(final ActiveMQBuffer bytes, boolean initialHeader) throws Exception { synchronized (largeBody) { largeBody.addBytes(bytes); } @@ -298,7 +293,6 @@ public Message copy(final long newID) { try { LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this); largeBody.copyInto(newMessage); - newMessage.finishParse(); newMessage.releaseResources(true); return newMessage.toMessage(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index f7684a82b9c..69ea514b8ae 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -66,11 +66,6 @@ public synchronized void addBytes(final byte[] bytes) { buffer.writeBytes(bytes); } - @Override - public void finishParse() throws Exception { - - } - @Override public void validateFile() throws ActiveMQException { @@ -82,7 +77,7 @@ public void setStorageManager(StorageManager storageManager) { } @Override - public synchronized void addBytes(ActiveMQBuffer bytes) { + public synchronized void addBytes(ActiveMQBuffer bytes, boolean initialHeader) { final int readableBytes = bytes.readableBytes(); if (buffer == null) { buffer = Unpooled.buffer(readableBytes); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index d9eb996f0d2..6375f3b2c17 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -34,7 +34,11 @@ public interface LargeServerMessage extends ReplicatedLargeMessage { @Override void addBytes(byte[] bytes) throws Exception; - void addBytes(ActiveMQBuffer bytes) throws Exception; + default void addBytes(ActiveMQBuffer bytes) throws Exception { + addBytes(bytes, false); + } + + void addBytes(ActiveMQBuffer bytes, boolean initialHeader) throws Exception; long getMessageID(); @@ -67,6 +71,4 @@ public interface LargeServerMessage extends ReplicatedLargeMessage { void setStorageManager(StorageManager storageManager); void validateFile() throws ActiveMQException; - - void finishParse() throws Exception; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java index 4f5315c0ab1..f6e816628d3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java @@ -59,7 +59,7 @@ public void durableMessageDataNotScannedOnRestartTest() throws Exception { final Message message = next.getMessage(); Assert.assertThat(message, Matchers.instanceOf(AMQPMessage.class)); amqpMessage = (AMQPMessage) message; - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, amqpMessage.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, amqpMessage.getDataScanningStatus()); } AmqpClient client = createAmqpClient(); @@ -75,7 +75,7 @@ public void durableMessageDataNotScannedOnRestartTest() throws Exception { assertEquals(1, afterRestartQueueView.getMessageCount()); - Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, amqpMessage.messageDataScanned()); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, amqpMessage.getDataScanningStatus()); receive.accept();