Skip to content
Permalink
Browse files
Implemented basic message producer. Removed unnecessary ZmqMessage. T…
…he normal message types will be directly supported once the wire format encoding is added.

Fixes [AMQNET-333]. (See https://issues.apache.org/jira/browse/AMQNET-333)
  • Loading branch information
Jim Gomes committed Jul 12, 2011
1 parent a56c3ba commit 7f0cc164bc0e8b9b52b9e2a2fb2a9c11d2e9d15c
Showing 9 changed files with 55 additions and 257 deletions.
@@ -27,8 +27,6 @@ namespace Apache.NMS.ZMQ
public class Connection : IConnection
{
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private IMessageConverter messageConverter = new DefaultMessageConverter();

private IRedeliveryPolicy redeliveryPolicy;
private ConnectionMetaData metaData = null;
private bool closed = true;
@@ -45,7 +43,7 @@ public class Connection : IConnection
/// </summary>
public void Start()
{
CheckConnected();
closed = false;
}

/// <summary>
@@ -54,15 +52,15 @@ public void Start()
/// </summary>
public bool IsStarted
{
get { return true; }
get { return !closed; }
}

/// <summary>
/// Stop message delivery for this connection.
/// </summary>
public void Stop()
{
CheckConnected();
closed = true;
}

/// <summary>
@@ -78,13 +76,17 @@ public ISession CreateSession()
/// </summary>
public ISession CreateSession(AcknowledgementMode mode)
{
CheckConnected();
return new Session(this, mode);
}

public void Dispose()
{
closed = true;
Close();
}

public void Close()
{
Stop();
}

/// <summary>
@@ -102,12 +104,6 @@ public AcknowledgementMode AcknowledgementMode
set { acknowledgementMode = value; }
}

public IMessageConverter MessageConverter
{
get { return messageConverter; }
set { messageConverter = value; }
}

/// <summary>
/// Get/or set the broker Uri.
/// </summary>
@@ -123,10 +119,7 @@ public Uri BrokerUri
public string ClientId
{
get { return clientId; }
set
{
clientId = value;
}
set { clientId = value; }
}

/// <summary>
@@ -157,10 +150,7 @@ public ProducerTransformerDelegate ProducerTransformer
/// </summary>
static internal ZContext Context
{
get
{
return _context;
}
get { return _context; }
}

/// <summary>
@@ -188,20 +178,6 @@ public IConnectionMetaData MetaData
/// </summary>
public event ConnectionResumedListener ConnectionResumedListener;

protected void CheckConnected()
{
closed = false;
if(null == messageConverter)
{
throw new NMSException("Context Not Created");
}
}

public void Close()
{
Dispose();
}

public void HandleException(System.Exception e)
{
if(ExceptionListener != null && !this.closed)

This file was deleted.

This file was deleted.

@@ -19,9 +19,9 @@
using System.Text;
using System.Threading;
using Apache.NMS.Util;
using ZSendRecvOpt = ZMQ.SendRecvOpt;
using ZSocket = ZMQ.Socket;
using ZSocketType = ZMQ.SocketType;
using ZSendRecvOpt = ZMQ.SendRecvOpt;

namespace Apache.NMS.ZMQ
{
@@ -114,16 +114,8 @@ public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode,
/// </returns>
public IMessage Receive()
{
IMessage nmsMessage = null;
if(null != messageSubscriber)
{
string messageText = messageSubscriber.Recv(Encoding.ASCII, ZSendRecvOpt.NOBLOCK);
if(!string.IsNullOrEmpty(messageText))
{
nmsMessage = ToNmsMessage(messageText);
}
}
return nmsMessage;
// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
return ToNmsMessage(messageSubscriber.Recv(Encoding.ASCII, ZSendRecvOpt.NOBLOCK));
}

/// <summary>
@@ -134,16 +126,8 @@ public IMessage Receive()
/// </returns>
public IMessage Receive(TimeSpan timeout)
{
IMessage nmsMessage = null;
if(null != messageSubscriber)
{
string messageText = messageSubscriber.Recv(Encoding.ASCII, timeout.Milliseconds);
if(!string.IsNullOrEmpty(messageText))
{
nmsMessage = ToNmsMessage(messageText);
}
}
return nmsMessage;
// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
return ToNmsMessage(messageSubscriber.Recv(Encoding.ASCII, timeout.Milliseconds));
}

/// <summary>
@@ -257,60 +241,36 @@ protected virtual void HandleAsyncException(Exception e)
/// <returns>
/// nms message object
/// </returns>
protected virtual IMessage ToNmsMessage(ZmqMessage message)
protected virtual IMessage ToNmsMessage(string messageText)
{
IMessage ReturnValue = null;
if(null == message)
IMessage nmsMessage = new TextMessage(messageText);

try
{
return ReturnValue;
nmsMessage.NMSMessageId = "";
nmsMessage.NMSDestination = new Queue(contextBinding);
nmsMessage.NMSDeliveryMode = MsgDeliveryMode.NonPersistent;
nmsMessage.NMSPriority = MsgPriority.Normal;
nmsMessage.NMSTimestamp = DateTime.Now;
nmsMessage.NMSTimeToLive = new TimeSpan(0);
nmsMessage.NMSType = "";
}
catch(InvalidOperationException)
{
// Log error
}

ReturnValue = session.MessageConverter.ToNmsMessage(message);

if(this.ConsumerTransformer != null)
if(null != this.ConsumerTransformer)
{
IMessage newMessage = ConsumerTransformer(this.session, this, ReturnValue);
if(newMessage != null)
IMessage transformedMessage = ConsumerTransformer(this.session, this, nmsMessage);

if(null != transformedMessage)
{
ReturnValue = newMessage;
nmsMessage = transformedMessage;
}
}
return ReturnValue;
}

/// <summary>
/// Create nms message object
/// </summary>
/// <param name="message">
/// message text
/// </param>
/// <returns>
/// nms message object
/// </returns>
protected virtual IMessage ToNmsMessage(string messageText)
{
IMessage ReturnValue = null;
ZmqMessage message = ToZmqMessage(messageText);
ReturnValue = ToNmsMessage(message);
return ReturnValue;
}

/// <summary>
/// Create zmq message object
/// </summary>
/// <param name="messageText">
/// message text
/// </param>
/// <returns>
/// zmq message object
/// </returns>
private ZmqMessage ToZmqMessage(string messageText)
{
ZmqMessage message = new ZmqMessage();
message.Destination = new Queue(this.contextBinding);
message.ClientId = session.Connection.ClientId;
message.Text = messageText;
return message;
return nmsMessage;
}
}
}
@@ -84,7 +84,18 @@ public void Send(IDestination destination, IMessage message)

public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
messageProducer.Send();
if(null != this.ProducerTransformer)
{
IMessage transformedMessage = ProducerTransformer(this.session, this, message);

if(null != transformedMessage)
{
message = transformedMessage;
}
}

// TODO: Support encoding of all message types + all meta data (e.g., headers and properties)
messageProducer.Send(((ITextMessage) message).Text, Encoding.ASCII);
}

public void Dispose()

0 comments on commit 7f0cc16

Please sign in to comment.