Skip to content

Commit

Permalink
ARTEMIS-3023 Avoid opening AMQP Large Message for final parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Dec 8, 2020
1 parent f939f49 commit 6a6a3b3
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 67 deletions.
Expand Up @@ -45,6 +45,7 @@
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.codec.CompositeReadableBuffer;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.TypeConstructor;
Expand Down Expand Up @@ -102,6 +103,9 @@ public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {

private StorageManager storageManager;

/** this is used to parse the initial packets from the buffer */
CompositeReadableBuffer parsingBuffer;

public AMQPLargeMessage(long id,
long messageFormat,
TypedProperties extraProperties,
Expand Down Expand Up @@ -272,19 +276,6 @@ protected void readSavedEncoding(ByteBuf buf) {
}
}


@Override
public void finishParse() throws Exception {
openLargeMessage();
try {
this.ensureMessageDataScanned();
parsingData.rewind();
lazyDecodeApplicationProperties();
} finally {
closeLargeMessage();
}
}

@Override
public void validateFile() throws ActiveMQException {
largeBody.validateFile();
Expand Down Expand Up @@ -344,12 +335,7 @@ public void parseHeader(ReadableBuffer buffer) {
}

public void addBytes(ReadableBuffer data) throws Exception {

// We need to parse the header on the first add,
// as it will contain information if the message is durable or not
if (header == null && largeBody.getStoredBodySize() <= 0) {
parseHeader(data);
}
parseLargeMessage(data);

if (data.hasArray() && data.remaining() == data.array().length) {
//System.out.println("Received " + data.array().length + "::" + ByteUtil.formatGroup(ByteUtil.bytesToHex(data.array()), 8, 16));
Expand All @@ -362,6 +348,63 @@ public void addBytes(ReadableBuffer data) throws Exception {
}
}

protected void parseLargeMessage(ActiveMQBuffer data, boolean initialHeader) {
MessageDataScanningStatus status = getDataScanningStatus();
if (status == MessageDataScanningStatus.NOT_SCANNED) {
ByteBuf buffer = data.byteBuf().duplicate();
if (parsingBuffer == null) {
parsingBuffer = new CompositeReadableBuffer();
}
byte[] parsingData = new byte[buffer.readableBytes()];
buffer.readBytes(parsingData);

parsingBuffer.append(parsingData);
if (!initialHeader) {
genericParseLargeMessage();
}
}
}

protected void parseLargeMessage(byte[] data, boolean initialHeader) {
MessageDataScanningStatus status = getDataScanningStatus();
if (status == MessageDataScanningStatus.NOT_SCANNED) {
byte[] copy = new byte[data.length];
System.arraycopy(data, 0, copy, 0, data.length);
if (parsingBuffer == null) {
parsingBuffer = new CompositeReadableBuffer();
}

parsingBuffer.append(copy);
if (!initialHeader) {
genericParseLargeMessage();
}
}
}

private void genericParseLargeMessage() {
try {
parsingBuffer.position(0);
scanMessageData(parsingBuffer);
lazyDecodeApplicationProperties(parsingBuffer);
parsingBuffer = null;
} catch (RuntimeException expected) {
// this would mean the buffer is not complete yet, so we keep parsing it, until we can get enough bytes
logger.debug("The buffer for AMQP Large Mesasge was probably not complete, so an exception eventually would be expected", expected);
}
}

protected void parseLargeMessage(ReadableBuffer data) {
MessageDataScanningStatus status = getDataScanningStatus();
if (status == MessageDataScanningStatus.NOT_SCANNED) {
if (parsingBuffer == null) {
parsingBuffer = new CompositeReadableBuffer();
}

parsingBuffer.append(data.duplicate());
genericParseLargeMessage();
}
}

@Override
public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference reference) {
return getData().rewind();
Expand All @@ -374,11 +417,13 @@ public Message toMessage() {

@Override
public void addBytes(byte[] bytes) throws Exception {
parseLargeMessage(bytes, false);
largeBody.addBytes(bytes);
}

@Override
public void addBytes(ActiveMQBuffer bytes) throws Exception {
public void addBytes(ActiveMQBuffer bytes, boolean initialHeader) throws Exception {
parseLargeMessage(bytes, initialHeader);
largeBody.addBytes(bytes);

}
Expand Down Expand Up @@ -471,7 +516,6 @@ public Message copy(final long newID, boolean isDLQOrExpiry) {
}

largeBody.copyInto(copy, bufferNewHeader, place.intValue());
copy.finishParse();
copy.releaseResources(true);
return copy;

Expand Down
Expand Up @@ -235,12 +235,7 @@ protected AMQPMessage(long messageFormat) {
this.coreMessageObjectPools = null;
}

/**
* Similarly to {@link MessageDataScanningStatus}, this method is made available only for testing
* purposes to check the message data scanning status.<br>
* Its access is not thread-safe and it shouldn't return {@code null}.
*/
public final MessageDataScanningStatus messageDataScanned() {
public final MessageDataScanningStatus getDataScanningStatus() {
return MessageDataScanningStatus.valueOf(messageDataScanned);
}

Expand Down Expand Up @@ -432,8 +427,12 @@ public final Footer getFooter() {
return scanForMessageSection(Math.max(0, remainingBodyPosition), Footer.class);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
protected <T> T scanForMessageSection(int scanStartPosition, Class...targetTypes) {
return scanForMessageSection(getData().duplicate().position(0), scanStartPosition, targetTypes);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
protected <T> T scanForMessageSection(ReadableBuffer buffer, int scanStartPosition, Class...targetTypes) {
ensureMessageDataScanned();

// In cases where we parsed out enough to know the value is not encoded in the message
Expand All @@ -442,7 +441,6 @@ protected <T> T scanForMessageSection(int scanStartPosition, Class...targetTypes
return null;
}

ReadableBuffer buffer = getData().duplicate().position(0);
final DecoderImpl decoder = TLSEncode.getDecoder();

buffer.position(scanStartPosition);
Expand Down Expand Up @@ -470,8 +468,15 @@ protected <T> T scanForMessageSection(int scanStartPosition, Class...targetTypes
}

protected ApplicationProperties lazyDecodeApplicationProperties() {
if (applicationProperties != null || applicationPropertiesPosition == VALUE_NOT_PRESENT) {
return applicationProperties;
}
return lazyDecodeApplicationProperties(getData().duplicate().position(0));
}

protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
applicationProperties = scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
}

return applicationProperties;
Expand Down Expand Up @@ -546,7 +551,7 @@ protected void setMessageAnnotation(Symbol annotation, Object value) {
// re-encode should be done to update the backing data with the in memory elements.

protected synchronized void ensureMessageDataScanned() {
final MessageDataScanningStatus state = MessageDataScanningStatus.valueOf(messageDataScanned);
final MessageDataScanningStatus state = getDataScanningStatus();
switch (state) {
case NOT_SCANNED:
scanMessageData();
Expand Down Expand Up @@ -583,14 +588,15 @@ protected synchronized void resetMessageData() {
}

protected synchronized void scanMessageData() {
this.messageDataScanned = MessageDataScanningStatus.SCANNED.code;
scanMessageData(getData());
}

protected synchronized void scanMessageData(ReadableBuffer data) {
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(getData().rewind());
decoder.setBuffer(data);

resetMessageData();

ReadableBuffer data = getData();

try {
while (data.hasRemaining()) {
int constructorPos = data.position();
Expand Down Expand Up @@ -634,6 +640,7 @@ protected synchronized void scanMessageData() {
decoder.setByteBuffer(null);
data.rewind();
}
this.messageDataScanned = MessageDataScanningStatus.SCANNED.code;
}

@Override
Expand Down
Expand Up @@ -250,7 +250,6 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
if (currentLargeMessage != null) {
currentLargeMessage.addBytes(receiver.recv());
receiver.advance();
currentLargeMessage.finishParse();
message = currentLargeMessage;
currentLargeMessage = null;
} else {
Expand Down
Expand Up @@ -172,20 +172,20 @@ public void testHasScheduledDeliveryTimeReloadPersistence() {
} catch (NullPointerException npe) {
}

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());

// Now reload from encoded data
message.reloadPersistence(encoded, null);

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());

assertTrue(message.hasScheduledDeliveryTime());

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());

message.getHeader();

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());

assertTrue(message.hasScheduledDeliveryTime());
}
Expand All @@ -205,20 +205,20 @@ public void testHasScheduledDeliveryDelayReloadPersistence() {
} catch (NullPointerException npe) {
}

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());

// Now reload from encoded data
message.reloadPersistence(encoded, null);

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());

assertTrue(message.hasScheduledDeliveryTime());

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());

message.getHeader();

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());

assertTrue(message.hasScheduledDeliveryTime());
}
Expand All @@ -235,20 +235,20 @@ public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() {
} catch (NullPointerException npe) {
}

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());

// Now reload from encoded data
message.reloadPersistence(encoded, null);

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());

assertFalse(message.hasScheduledDeliveryTime());

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());

message.getHeader();

Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());

assertFalse(message.hasScheduledDeliveryTime());
}
Expand Down
Expand Up @@ -336,7 +336,7 @@ public void copyInto(LargeServerMessage newMessage, ByteBuf newHeader, int skipB
cloneFile.position(skipBytes);

if (newHeader != null) {
newMessage.addBytes(new ChannelBufferWrapper(newHeader));
newMessage.addBytes(new ChannelBufferWrapper(newHeader), true);
}
for (; ; ) {
// The buffer is reused...
Expand Down
Expand Up @@ -57,11 +57,6 @@ public static Message checkLargeMessage(Message message, StorageManager storageM
}
}

@Override
public void finishParse() throws Exception {

}

private static Message asLargeMessage(Message message, StorageManager storageManager) throws Exception {
ICoreMessage coreMessage = message.toCore();
LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
Expand Down Expand Up @@ -176,7 +171,7 @@ public void addBytes(final byte[] bytes) throws Exception {
}

@Override
public void addBytes(final ActiveMQBuffer bytes) throws Exception {
public void addBytes(final ActiveMQBuffer bytes, boolean initialHeader) throws Exception {
synchronized (largeBody) {
largeBody.addBytes(bytes);
}
Expand Down Expand Up @@ -298,7 +293,6 @@ public Message copy(final long newID) {
try {
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
largeBody.copyInto(newMessage);
newMessage.finishParse();
newMessage.releaseResources(true);
return newMessage.toMessage();

Expand Down
Expand Up @@ -66,11 +66,6 @@ public synchronized void addBytes(final byte[] bytes) {
buffer.writeBytes(bytes);
}

@Override
public void finishParse() throws Exception {

}

@Override
public void validateFile() throws ActiveMQException {

Expand All @@ -82,7 +77,7 @@ public void setStorageManager(StorageManager storageManager) {
}

@Override
public synchronized void addBytes(ActiveMQBuffer bytes) {
public synchronized void addBytes(ActiveMQBuffer bytes, boolean initialHeader) {
final int readableBytes = bytes.readableBytes();
if (buffer == null) {
buffer = Unpooled.buffer(readableBytes);
Expand Down
Expand Up @@ -34,7 +34,11 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
@Override
void addBytes(byte[] bytes) throws Exception;

void addBytes(ActiveMQBuffer bytes) throws Exception;
default void addBytes(ActiveMQBuffer bytes) throws Exception {
addBytes(bytes, false);
}

void addBytes(ActiveMQBuffer bytes, boolean initialHeader) throws Exception;

long getMessageID();

Expand Down Expand Up @@ -67,6 +71,4 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
void setStorageManager(StorageManager storageManager);

void validateFile() throws ActiveMQException;

void finishParse() throws Exception;
}

0 comments on commit 6a6a3b3

Please sign in to comment.