Skip to content

Commit

Permalink
QPID-8273: [Broker-J] Handle malformed messages
Browse files Browse the repository at this point in the history
This closes #21
  • Loading branch information
alex-rufous committed Feb 24, 2019
1 parent 36b56ed commit adb2a34
Show file tree
Hide file tree
Showing 39 changed files with 1,309 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void closeMessageStore()
{
for (StoredBDBMessage<?> message : _messages)
{
message.clear();
message.clear(true);
}
_messages.clear();
_inMemorySize.set(0);
Expand Down Expand Up @@ -993,20 +993,36 @@ public void reallocate()
_data = QpidByteBuffer.reallocateIfNecessary(_data);
}

public long clear()
public long clear(boolean close)
{
long bytesCleared = 0;
if(_metaData != null)
if(_data != null)
{
bytesCleared += _metaData.getStorableSize();
_metaData.clearEncodedForm();
_metaData = null;
if(_data != null)
{
bytesCleared += _data.remaining();
_data.dispose();
_data = null;
}
}
if(_data != null)
if (_metaData != null)
{
bytesCleared += _data.remaining();
_data.dispose();
_data = null;
bytesCleared += _metaData.getStorableSize();
try
{
if (close)
{
_metaData.dispose();
}
else
{
_metaData.clearEncodedForm();
}
}
finally
{
_metaData = null;
}
}
return bytesCleared;
}
Expand Down Expand Up @@ -1222,7 +1238,7 @@ public synchronized boolean flowToDisk()
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
final long bytesCleared = _messageDataRef.clear();
final long bytesCleared = _messageDataRef.clear(false);
_inMemorySize.addAndGet(-bytesCleared);
_bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
Expand All @@ -1244,11 +1260,11 @@ public synchronized void reallocate()
}
}

public synchronized void clear()
public synchronized void clear(boolean close)
{
if (_messageDataRef != null)
{
_messageDataRef.clear();
_messageDataRef.clear(close);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class QueueMessages
public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
public static final String DROPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped";
public static final String MALFORMED_MESSAGE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.malformed_message";
public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation";
public static final String OVERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull";
public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
Expand All @@ -76,6 +77,7 @@ public class QueueMessages
LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY);
LoggerFactory.getLogger(MALFORMED_MESSAGE_LOG_HIERARCHY);
LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY);
LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
Expand Down Expand Up @@ -326,6 +328,66 @@ public int hashCode()
};
}

/**
* Log a Queue message of the Format:
* <pre>QUE-1006 : Malformed : {0} : {1}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
public static LogMessage MALFORMED_MESSAGE(String param1, String param2)
{
String rawMessage = _messages.getString("MALFORMED_MESSAGE");

final Object[] messageArguments = {param1, param2};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);

final String message = formatter.format(messageArguments);

return new LogMessage()
{
@Override
public String toString()
{
return message;
}

@Override
public String getLogHierarchy()
{
return MALFORMED_MESSAGE_LOG_HIERARCHY;
}

@Override
public boolean equals(final Object o)
{
if (this == o)
{
return true;
}
if (o == null || getClass() != o.getClass())
{
return false;
}

final LogMessage that = (LogMessage) o;

return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());

}

@Override
public int hashCode()
{
int result = toString().hashCode();
result = 31 * result + getLogHierarchy().hashCode();
return result;
}
};
}

/**
* Log a Queue message of the Format:
* <pre>QUE-1016 : Operation : {0}</pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ DELETED = QUE-1002 : Deleted : ID: {0}
OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages
MALFORMED_MESSAGE = QUE-1006 : Malformed : {0} : {1}


# These are no longer in use
#FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
Expand All @@ -36,7 +39,7 @@

public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
{

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServerMessageImpl.class);
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");

Expand All @@ -49,6 +52,12 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
@SuppressWarnings("unused")
private volatile Collection<UUID> _resources;

private volatile ServerMessage.ValidationStatus _validationStatus = ServerMessage.ValidationStatus.UNKNOWN;

private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, ServerMessage.ValidationStatus>
_validationStatusUpdater = AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class,
ServerMessage.ValidationStatus.class,
"_validationStatus");

public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
{
Expand Down Expand Up @@ -192,7 +201,7 @@ public QpidByteBuffer getContent(int offset, int length)
}
finally
{
if (!wasInMemory)
if (!wasInMemory && checkValid())
{
storedMessage.flowToDisk();
}
Expand All @@ -211,6 +220,44 @@ public String toString()
return "Message[" + debugIdentity() + "]";
}

@Override
public ServerMessage.ValidationStatus getValidationStatus()
{
return _validationStatus;
}

@Override
public boolean checkValid()
{
ServerMessage.ValidationStatus status;
while ((status = _validationStatus) == ServerMessage.ValidationStatus.UNKNOWN)
{
ServerMessage.ValidationStatus newStatus;
try
{
validate();
newStatus = ServerMessage.ValidationStatus.VALID;
}
catch (RuntimeException e)
{
newStatus = ServerMessage.ValidationStatus.MALFORMED;
LOGGER.debug("Malformed message '{}' detected", this, e);
}

if (_validationStatusUpdater.compareAndSet(this, status, newStatus))
{
status = newStatus;
break;
}
}
return status == ServerMessage.ValidationStatus.VALID;
}

protected void validate()
{
// noop
}

private static class Reference<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData>
implements MessageReference<X>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,15 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enqueu
Object getConnectionReference();

boolean isResourceAcceptable(TransactionLogResource resource);

boolean checkValid();

ValidationStatus getValidationStatus();

enum ValidationStatus
{
UNKNOWN,
VALID,
MALFORMED
}
}
12 changes: 12 additions & 0 deletions broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,16 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
description = "Current age of oldest message on the queue.")
long getOldestMessageAge();

@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Malformed",
description = "Total size of enqueued malformed messages.")
long getTotalMalformedBytes();

@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Malformed",
description = "Total number of enqueued malformed messages.")
long getTotalMalformedMessages();

@ManagedOperation(description = "move messages from this queue to another", changesConfiguredObjectState = false)
List<Long> moveMessages(@Param(name = "destination", description = "The queue to which the messages should be moved", mandatory = true) Queue<?> destination,
@Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for moving") List<Long> messageIds,
Expand Down Expand Up @@ -572,6 +582,8 @@ MessageInfo getMessageInfoById(@Param(name = "messageId") long messageId,

QueueEntryIterator queueEntryIterator();

boolean checkValid(QueueEntry queueEntry);

enum ExpiryPolicy
{
DELETE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,30 @@ private void decode()
{
_encodedForm.reset();
}

final long recalculateEncodedSize = recalculateEncodedSize();
if (_encodedSize != recalculateEncodedSize)
{
throw new IllegalStateException(String.format(
"Malformed field table detected: provided encoded size '%d' does not equal calculated size '%d'",
_encodedSize,
recalculateEncodedSize));
}
}
}

private void decodeIfNecessary()
{
if (!_decoded)
{
decode();
_decoded = true;
try
{
decode();
}
finally
{
_decoded = true;
}
}
}

Expand Down Expand Up @@ -329,6 +344,18 @@ public long getEncodedSize()
return _encodedSize;
}

private synchronized long recalculateEncodedSize()
{
long size = 0L;
for (Map.Entry<String, AMQTypedValue> e : _properties.entrySet())
{
String key = e.getKey();
AMQTypedValue value = e.getValue();
size += EncodingUtils.encodedShortStringLength(key) + 1 + value.getEncodingSize();
}
return size;
}

public static Map<String, Object> convertToMap(final FieldTable fieldTable)
{
final Map<String, Object> map = new HashMap<>();
Expand Down Expand Up @@ -358,12 +385,17 @@ else if (val instanceof FieldTable)

public synchronized void clearEncodedForm()
{
decodeIfNecessary();

if (_encodedForm != null)
try
{
_encodedForm.dispose();
_encodedForm = null;
decodeIfNecessary();
}
finally
{
if (_encodedForm != null)
{
_encodedForm.dispose();
_encodedForm = null;
}
}
}

Expand Down Expand Up @@ -498,4 +530,9 @@ public static FieldTable convertToFieldTable(Map<String, Object> map)
return null;
}
}

public synchronized void validate()
{
decodeIfNecessary();
}
}
Loading

0 comments on commit adb2a34

Please sign in to comment.