Skip to content
Permalink
Browse files
Add support for serializing/deserializing BytesMessages.
  • Loading branch information
Jim Gomes committed Oct 9, 2014
1 parent 310828d commit 0ba057f0cc5d7562d9ad88f2a966211ab81de98c
Showing 6 changed files with 92 additions and 57 deletions.
@@ -36,12 +36,19 @@ public class BaseMessage : IMessage
private string type;
private event AcknowledgeHandler Acknowledger;
private DateTime timestamp = new DateTime();
private bool readOnlyMsgProperties = false;
private bool readOnlyMsgBody = false;

public bool ReadOnlyBody
public virtual bool ReadOnlyProperties
{
get { return readOnlyMsgBody; }
set { readOnlyMsgBody = value; }
get { return this.readOnlyMsgProperties; }
set { this.readOnlyMsgProperties = value; }
}

public virtual bool ReadOnlyBody
{
get { return this.readOnlyMsgBody; }
set { this.readOnlyMsgBody = value; }
}

// IMessage interface
@@ -155,7 +162,6 @@ public bool NMSRedelivered
set { }
}


/// <summary>
/// The destination that the consumer of this message should send replies to
/// </summary>
@@ -190,7 +196,6 @@ public string NMSType
set { type = value; }
}


public object GetObjectProperty(string name)
{
return null;
@@ -200,17 +205,23 @@ public void SetObjectProperty(string name, object value)
{
}

public virtual void OnSend()
{
this.ReadOnlyProperties = true;
this.ReadOnlyBody = true;
}

protected void FailIfReadOnlyBody()
{
if(ReadOnlyBody == true)
if(ReadOnlyBody)
{
throw new MessageNotWriteableException("Message is in Read-Only mode.");
}
}

protected void FailIfWriteOnlyBody()
{
if(ReadOnlyBody == false)
if(!ReadOnlyBody)
{
throw new MessageNotReadableException("Message is in Write-Only mode.");
}
@@ -36,6 +36,7 @@ public abstract class Destination : IDestination
protected ZmqSocket producerEndpoint = null;
protected ZmqSocket consumerEndpoint = null;
protected string destinationName;
internal byte[] rawDestinationName;

private bool disposed = false;

@@ -47,6 +48,7 @@ protected Destination(Session session, string destName)
{
this.session = session;
this.destinationName = destName;
this.rawDestinationName = Destination.encoding.GetBytes(this.destinationName);
this.session.RegisterDestination(this);
}

@@ -88,23 +90,8 @@ private void Dispose(bool disposing)
/// </summary>
protected virtual void OnDispose()
{
if(null != this.producerEndpoint)
{
if(null != this.session
&& null != this.session.Connection)
{
this.session.Connection.ReleaseProducer(this.producerEndpoint);
}

this.producerEndpoint = null;
}

if(null != this.consumerEndpoint)
{
this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
this.consumerEndpoint = null;
}

DeinitSender();
DeinitReceiver();
this.session.UnregisterDestination(this);
}

@@ -190,6 +177,20 @@ internal void InitSender()
}
}

internal void DeinitSender()
{
if(null != this.producerEndpoint)
{
if(null != this.session
&& null != this.session.Connection)
{
this.session.Connection.ReleaseProducer(this.producerEndpoint);
}

this.producerEndpoint = null;
}
}

internal void InitReceiver()
{
if(null == this.consumerEndpoint)
@@ -198,34 +199,27 @@ internal void InitReceiver()

this.consumerEndpoint = connection.GetConsumer();
// Must subscribe first before connecting to the endpoint binding
this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName));
this.consumerEndpoint.Subscribe(this.rawDestinationName);
this.consumerEndpoint.Connect(connection.GetConsumerBindingPath());
}
}

internal void Subscribe(string prefixName)
{
InitReceiver();
this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName));
}

internal void Unsubscribe(string prefixName)
internal void DeinitReceiver()
{
if(null != this.consumerEndpoint)
{
this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName));
this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
this.consumerEndpoint = null;
}
}

internal SendStatus Send(string msg)
{
Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
return this.producerEndpoint.Send(msg, Destination.encoding);
}

internal SendStatus Send(byte[] buffer)
{
Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
return this.producerEndpoint.Send(buffer);
}

@@ -246,20 +240,6 @@ internal byte[] ReceiveBytes(SocketFlags flags, out int size)
this.InitReceiver();
return this.consumerEndpoint.Receive(null, flags, out size);
}

internal Frame ReceiveFrame()
{
// TODO: Implement
this.InitReceiver();
return null;
}

internal ZmqMessage ReceiveMessage()
{
// TODO: Implement
this.InitReceiver();
return null;
}
}
}

@@ -40,7 +40,6 @@ public class MessageConsumer : IMessageConsumer
private object asyncDeliveryLock = new object();
private bool asyncDelivery = false;
private bool asyncInit = false;
private byte[] rawDestinationName;

private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
@@ -82,7 +81,6 @@ public MessageConsumer(Session sess, AcknowledgementMode ackMode, IDestination d

this.session = sess;
this.destination = theDest;
this.rawDestinationName = Destination.encoding.GetBytes(this.destination.Name);
this.acknowledgementMode = ackMode;
}

@@ -145,7 +143,7 @@ public IMessage Receive(TimeSpan timeout)
if(size > 0)
{
// Strip off the subscribed destination name.
int receivedMsgIndex = this.rawDestinationName.Length;
int receivedMsgIndex = this.destination.rawDestinationName.Length;
int msgLength = receivedMsg.Length - receivedMsgIndex;
byte[] msgContent = new byte[msgLength];

@@ -406,6 +404,14 @@ protected virtual IMessage ToNmsMessage(byte[] msgData)
}
break;

case WireFormat.MT_BYTESMESSAGE:
nmsMessage = new BytesMessage();
if(null != messageBody)
{
((BytesMessage) nmsMessage).Content = messageBody;
}
break;

case WireFormat.MT_UNKNOWN:
default:
break;
@@ -444,6 +450,9 @@ protected virtual IMessage ToNmsMessage(byte[] msgData)
nmsMessage = transformedMessage as BaseMessage;
}
}

nmsMessage.ReadOnlyBody = true;
nmsMessage.ReadOnlyProperties = true;
}

return nmsMessage;
@@ -19,8 +19,8 @@

using System;
using System.Collections.Generic;
using System.Text;
using System.Net;
using System.Text;
using Apache.NMS.Util;

namespace Apache.NMS.ZMQ
@@ -48,13 +48,35 @@ public ProducerTransformerDelegate ProducerTransformer

public MessageProducer(Session sess, IDestination dest)
{
if(null == sess.Connection.Context)
if(null == sess
|| null == sess.Connection
|| null == sess.Connection.Context)
{
throw new NMSConnectionException();
}

Destination theDest = dest as Destination;

if(null == theDest)
{
throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
}
else if(null == theDest.Name)
{
throw new InvalidDestinationException("The destination object was not given a physical name.");
}
else if(theDest.IsTemporary)
{
String physicalName = theDest.Name;

if(String.IsNullOrEmpty(physicalName))
{
throw new InvalidDestinationException("Physical name of Destination should be valid: " + theDest);
}
}

this.session = sess;
this.destination = (Destination) dest;
this.destination = theDest;
this.destination.InitSender();
}

@@ -150,6 +172,17 @@ public void Send(IDestination dest, IMessage message, MsgDeliveryMode deliveryMo
EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody);
}
}
else if(message is IBytesMessage)
{
EncodeField(msgDataBuilder, WireFormat.MFT_MSGTYPE, WireFormat.MT_BYTESMESSAGE);
// Append the message text body to the msg.
byte[] msgBody = ((IBytesMessage) message).Content;

if(null != msgBody)
{
EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody);
}
}
else
{
// TODO: Add support for more message types
@@ -158,6 +191,8 @@ public void Send(IDestination dest, IMessage message, MsgDeliveryMode deliveryMo

// Put the sentinal field marker.
EncodeField(msgDataBuilder, WireFormat.MFT_NONE, 0);

((BaseMessage) message).OnSend();
theDest.Send(msgDataBuilder.ToArray());
}

@@ -25,7 +25,7 @@ namespace Apache.NMS.ZMQ
public class TemporaryQueue : Destination, ITemporaryQueue
{
public TemporaryQueue(Session session)
: base(session, Guid.NewGuid().ToString())
: base(session, "TEMPQUEUE." + Guid.NewGuid().ToString())
{
}

@@ -25,7 +25,7 @@ namespace Apache.NMS.ZMQ
public class TemporaryTopic : Destination, ITemporaryTopic
{
public TemporaryTopic(Session session)
: base(session, Guid.NewGuid().ToString())
: base(session, "TEMPTOPIC." + Guid.NewGuid().ToString())
{
}

0 comments on commit 0ba057f

Please sign in to comment.