Skip to content
Permalink
Browse files
Fix serializing/deserializing the message properties. Use the existin…
…g PrimitiveMap marshal/unmarshal functions.

Add additional exception handling in the MessageConsumer to pass the BadConsumerTest unit test.
Fixes [AMQNET-491]. (See https://issues.apache.org/jira/browse/AMQNET-491)
  • Loading branch information
Jim Gomes committed Oct 8, 2014
1 parent 6d8d067 commit 310828d7c410821e3656553ff2d8695b688b5bad
Showing 4 changed files with 43 additions and 29 deletions.
@@ -84,6 +84,11 @@ public IPrimitiveMap Properties
get { return propertiesMap; }
}

internal PrimitiveMap PropertiesMap
{
get { return propertiesMap; }
set { propertiesMap = value; }
}

// NMS headers

@@ -53,13 +53,35 @@ public MessageConsumer(Session sess, AcknowledgementMode ackMode, IDestination d
{
// UNUSED_PARAM(selector); // Selectors are not currently supported

if(null == sess.Connection.Context)
if(null == sess
|| null == sess.Connection
|| null == sess.Connection.Context)
{
throw new NMSConnectionException();
}

Destination theDest = dest as Destination;

if(null == theDest)
{
throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
}
else if(null == theDest.Name)
{
throw new InvalidDestinationException("The destination object was not given a physical name.");
}
else if(theDest.IsTemporary)
{
String physicalName = theDest.Name;

if(String.IsNullOrEmpty(physicalName))
{
throw new InvalidDestinationException("Physical name of Destination should be valid: " + theDest);
}
}

this.session = sess;
this.destination = (Destination) dest;
this.destination = theDest;
this.rawDestinationName = Destination.encoding.GetBytes(this.destination.Name);
this.acknowledgementMode = ackMode;
}
@@ -268,7 +290,7 @@ protected virtual void HandleAsyncException(Exception e)
/// </returns>
protected virtual IMessage ToNmsMessage(byte[] msgData)
{
IMessage nmsMessage = null;
BaseMessage nmsMessage = null;
int messageType = WireFormat.MT_UNKNOWN;
int fieldType = WireFormat.MFT_NONE;
DateTime messageTimestamp = DateTime.UtcNow;
@@ -278,7 +300,7 @@ protected virtual IMessage ToNmsMessage(byte[] msgData)
MsgDeliveryMode messageDeliveryMode = MsgDeliveryMode.NonPersistent;
MsgPriority messagePriority = MsgPriority.Normal;
TimeSpan messageTimeToLive = TimeSpan.FromTicks(0);
IPrimitiveMap messageProperties = null;
byte[] messageProperties = null;
int fieldLen;
int index = 0;
string messageID = string.Empty;
@@ -338,16 +360,10 @@ protected virtual IMessage ToNmsMessage(byte[] msgData)

case WireFormat.MFT_HEADERS:
fieldLen = ReadInt(msgData, ref index);
int numProperties = ReadInt(msgData, ref index);
if(numProperties > 0)
messageProperties = new byte[fieldLen];
for(int propIndex = 0; propIndex < fieldLen; propIndex++, index++)
{
messageProperties = new PrimitiveMap();
while(numProperties-- > 0)
{
string propertyKey = ReadString(msgData, ref index);
byte[] propertyVal = ReadBytes(msgData, ref index);
messageProperties.SetBytes(propertyKey, propertyVal);
}
messageProperties[propIndex] = msgData[index];
}
break;

@@ -411,10 +427,7 @@ protected virtual IMessage ToNmsMessage(byte[] msgData)
nmsMessage.NMSType = messageNMSType;
if(null != messageProperties)
{
foreach(string propertyKey in messageProperties.Keys)
{
nmsMessage.Properties.SetBytes(propertyKey, messageProperties.GetBytes(propertyKey));
}
nmsMessage.PropertiesMap = PrimitiveMap.Unmarshal(messageProperties);
}
}
catch(InvalidOperationException)
@@ -428,7 +441,7 @@ protected virtual IMessage ToNmsMessage(byte[] msgData)

if(null != transformedMessage)
{
nmsMessage = transformedMessage;
nmsMessage = transformedMessage as BaseMessage;
}
}
}
@@ -21,6 +21,7 @@
using System.Collections.Generic;
using System.Text;
using System.Net;
using Apache.NMS.Util;

namespace Apache.NMS.ZMQ
{
@@ -135,17 +136,7 @@ public void Send(IDestination dest, IMessage message, MsgDeliveryMode deliveryMo
IPrimitiveMap properties = message.Properties;
if(null != properties && properties.Count > 0)
{
// Encode into a temporary buffer, and then place a single buffer into the msgDataBuilder.
List<byte> propertiesBuilder = new List<byte>();

EncodeFieldData(propertiesBuilder, propertiesBuilder.Count);
foreach(string propertyKey in properties.Keys)
{
EncodeFieldData(propertiesBuilder, propertyKey);
EncodeFieldData(propertiesBuilder, properties.GetBytes(propertyKey));
}

EncodeField(msgDataBuilder, WireFormat.MFT_HEADERS, propertiesBuilder.ToArray());
EncodeField(msgDataBuilder, WireFormat.MFT_HEADERS, ((PrimitiveMap) properties).Marshal());
}

if(message is ITextMessage)
@@ -169,6 +169,11 @@ public ITemporaryTopic CreateTemporaryTopic()
public void DeleteDestination(IDestination destination)
{
// Nothing to delete. Resources automatically disappear.
if(destination.IsTemporary)
{
destination.Dispose();
}

return;
}

0 comments on commit 310828d

Please sign in to comment.