Skip to content

Commit

Permalink
ARTEMIS-2096 Refactor AMQMessage abstraction
Browse files Browse the repository at this point in the history
Major refactoring of the AMQPMessage abstraction to resolve
some issue of message corruption still present in the code and
improve the API handling of message changes and re-encoding.

Improves handling of decoding of message sections limiting the
work to only the portions needed and ensuring the state data
is always updated with what has been done.  Fixes issues of
corrupt state on copy of message or other changes in filters.
  • Loading branch information
tabish121 authored and clebertsuconic committed Sep 26, 2018
1 parent 2453978 commit a851a8f
Show file tree
Hide file tree
Showing 16 changed files with 3,491 additions and 1,362 deletions.

Large diffs are not rendered by default.

Expand Up @@ -50,7 +50,6 @@ public int getEncodeSize(Message record) {
SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG;
}


/** Sub classes must add the first short as the protocol-id */
@Override
public void encode(ActiveMQBuffer buffer, Message record) {
Expand All @@ -62,7 +61,6 @@ public void encode(ActiveMQBuffer buffer, Message record) {
record.persist(buffer);
}


@Override
public Message decode(ActiveMQBuffer buffer, Message record) {
long id = buffer.readLong();
Expand All @@ -76,5 +74,4 @@ record = new AMQPMessage(format);
}
return record;
}

}
Expand Up @@ -63,9 +63,22 @@ public final class AMQPMessageSupport {

/**
* Attribute used to mark the Application defined delivery time assigned to the message
*
* @deprecated Use the SCHEDULED_DELIVERY_TIME value as this is not JMS specific and will be removed.
*/
@Deprecated
public static final Symbol JMS_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");

/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");

/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol("x-opt-delivery-delay");

/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
Expand Down Expand Up @@ -226,6 +239,24 @@ public static boolean isContentType(String contentType, Message message) {
}
}

/**
* Check whether the content-type given matches the expect value.
*
* @param expected
* content type string to compare against or null if not expected to be set
* @param actual
* the AMQP content type symbol from the Properties section
*
* @return true if content type matches
*/
public static boolean isContentType(String expected, Symbol actual) {
if (expected == null) {
return actual == null;
} else {
return expected.equals(actual != null ? actual.toString() : actual);
}
}

/**
* @param contentType
* the contentType of the received message
Expand Down
Expand Up @@ -98,54 +98,60 @@
* */
public class AmqpCoreConverter {

@SuppressWarnings("unchecked")
public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
return message.toCore(coreMessageObjectPools);
}

@SuppressWarnings("unchecked")
public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools, Header header, MessageAnnotations annotations, Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) throws Exception {
final long messageId = message.getMessageID();
final Symbol contentType = properties != null ? properties.getContentType() : null;
final String contentTypeString = contentType != null ? contentType.toString() : null;

Section body = message.getProtonMessage().getBody();
ServerJMSMessage result;

if (body == null) {
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
result = createBytesMessage(message.getMessageID(), coreMessageObjectPools);
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType) || isContentType(null, contentType)) {
result = createBytesMessage(messageId, coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
Charset charset = getCharsetForTextualContent(contentTypeString);
if (charset != null) {
result = createTextMessage(message.getMessageID(), coreMessageObjectPools);
result = createTextMessage(messageId, coreMessageObjectPools);
} else {
result = createMessage(message.getMessageID(), coreMessageObjectPools);
result = createMessage(messageId, coreMessageObjectPools);
}
}

result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
} else if (body instanceof Data) {
Binary payload = ((Data) body).getValue();

if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType)) {
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
Charset charset = getCharsetForTextualContent(contentTypeString);
if (StandardCharsets.UTF_8.equals(charset)) {
ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());

try {
CharBuffer chars = charset.newDecoder().decode(buf);
result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools);
result = createTextMessage(messageId, String.valueOf(chars), coreMessageObjectPools);
} catch (CharacterCodingException e) {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
} else {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
}

result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
} else if (body instanceof AmqpSequence) {
AmqpSequence sequence = (AmqpSequence) body;
ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : sequence.getValue()) {
m.writeObject(item);
}
Expand All @@ -155,35 +161,35 @@ public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools co
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
if (value == null || value instanceof String) {
result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools);
result = createTextMessage(messageId, (String) value, coreMessageObjectPools);

result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
} else if (value instanceof Binary) {
Binary payload = (Binary) value;

if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools);
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, payload, coreMessageObjectPools);
} else {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}

result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
} else if (value instanceof List) {
ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : (List<Object>) value) {
m.writeObject(item);
}
result = m;
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
} else if (value instanceof Map) {
result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools);
result = createMapMessage(messageId, (Map<String, Object>) value, coreMessageObjectPools);
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
} else {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
try {
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
TLSEncode.getEncoder().writeObject(body);
result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
result = createBytesMessage(messageId, buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
} finally {
buf.release();
TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
Expand All @@ -193,30 +199,38 @@ public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools co
throw new RuntimeException("Unexpected body type: " + body.getClass());
}

TypedProperties properties = message.getExtraProperties();
if (properties != null) {
for (SimpleString str : properties.getPropertyNames()) {
if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
continue;
}
result.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
processHeader(result, header);
processMessageAnnotations(result, annotations);
processApplicationProperties(result, applicationProperties);
processProperties(result, properties);
processFooter(result, footer);
processExtraProperties(result, message.getExtraProperties());

// If the JMS expiration has not yet been set...
if (header != null && result.getJMSExpiration() == 0) {
// Then lets try to set it based on the message TTL.
long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
if (header.getTtl() != null) {
ttl = header.getTtl().longValue();
}

if (ttl == 0) {
result.setJMSExpiration(0);
} else {
result.setJMSExpiration(System.currentTimeMillis() + ttl);
}
}

populateMessage(result, message.getProtonMessage());
result.getInnerMessage().setReplyTo(message.getReplyTo());
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
result.getInnerMessage().setAddress(message.getAddressSimpleString());

result.encode();

return result != null ? result.getInnerMessage() : null;
return result.getInnerMessage();
}

@SuppressWarnings("unchecked")
protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
Header header = amqp.getHeader();
protected static ServerJMSMessage processHeader(ServerJMSMessage jms, Header header) throws Exception {
if (header != null) {
jms.setBooleanProperty(JMS_AMQP_HEADER, true);

Expand Down Expand Up @@ -248,9 +262,12 @@ protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apac
jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
}

final MessageAnnotations ma = amqp.getMessageAnnotations();
if (ma != null) {
for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
return jms;
}

protected static ServerJMSMessage processMessageAnnotations(ServerJMSMessage jms, MessageAnnotations annotations) throws Exception {
if (annotations != null && annotations.getValue() != null) {
for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
String key = entry.getKey().toString();
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
long deliveryTime = ((Number) entry.getValue()).longValue();
Expand All @@ -266,14 +283,33 @@ protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apac
}
}

final ApplicationProperties ap = amqp.getApplicationProperties();
if (ap != null) {
for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) ap.getValue().entrySet()) {
return jms;
}

private static ServerJMSMessage processApplicationProperties(ServerJMSMessage jms, ApplicationProperties properties) throws Exception {
if (properties != null && properties.getValue() != null) {
for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) properties.getValue().entrySet()) {
setProperty(jms, entry.getKey(), entry.getValue());
}
}

final Properties properties = amqp.getProperties();
return jms;
}

private static ServerJMSMessage processExtraProperties(ServerJMSMessage jms, TypedProperties properties) {
if (properties != null) {
for (SimpleString str : properties.getPropertyNames()) {
if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
continue;
}
jms.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
}
}

return jms;
}

private static ServerJMSMessage processProperties(ServerJMSMessage jms, Properties properties) throws Exception {
if (properties != null) {
if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
Expand Down Expand Up @@ -317,24 +353,13 @@ protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apac
}
}

// If the jms expiration has not yet been set...
if (header != null && jms.getJMSExpiration() == 0) {
// Then lets try to set it based on the message ttl.
long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
if (header.getTtl() != null) {
ttl = header.getTtl().longValue();
}

if (ttl == 0) {
jms.setJMSExpiration(0);
} else {
jms.setJMSExpiration(System.currentTimeMillis() + ttl);
}
}
return jms;
}

final Footer fp = amqp.getFooter();
if (fp != null) {
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
@SuppressWarnings("unchecked")
private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception {
if (footer != null && footer.getValue() != null) {
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) footer.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
}
Expand Down
Expand Up @@ -139,5 +139,4 @@ public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, Object> fi

return null;
}

}
Expand Up @@ -117,11 +117,9 @@ public int limit() {
@Override
public void put(ReadableBuffer buffer) {
if (buffer.hasArray()) {
nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset(), buffer.remaining());
nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
while (buffer.hasRemaining()) {
nettyBuffer.writeByte(buffer.get());
}
nettyBuffer.writeBytes(buffer.byteBuffer());
}
}
}

0 comments on commit a851a8f

Please sign in to comment.