Skip to content
Permalink
Browse files
Copy the ignoreExpiration implementation from the OpenWire provider.
  • Loading branch information
Jim Gomes committed May 23, 2014
1 parent 0716997 commit 31b93cee8dbb551d8e0d2b1dd80ca12d9d23da80
Showing 4 changed files with 444 additions and 90 deletions.
@@ -15,6 +15,7 @@
* limitations under the License.
*/
using System;
using System.Collections.Specialized;
using System.Threading;
using System.Collections.Generic;
using Apache.NMS.Stomp.Commands;
@@ -58,14 +59,45 @@ public class MessageConsumer : IMessageConsumer, IDispatcher
private IRedeliveryPolicy redeliveryPolicy;
private Exception failureError;

// Constructor internal to prevent clients from creating an instance.
internal MessageConsumer(Session session, ConsumerInfo info)
// Constructor internal to prevent clients from creating an instance.
internal MessageConsumer(Session session, ConsumerId id, Destination destination, string name, string selector, int prefetch, bool noLocal)
{
this.session = session;
this.info = info;
if(destination == null)
{
throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
}

this.session = session;
this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
this.messageTransformation = this.session.Connection.MessageTransformation;
}

this.info = new ConsumerInfo();
this.info.ConsumerId = id;
this.info.Destination = Destination.Transform(destination);
this.info.SubscriptionName = name;
this.info.Selector = selector;
this.info.PrefetchSize = prefetch;
this.info.MaximumPendingMessageLimit = session.Connection.PrefetchPolicy.MaximumPendingMessageLimit;
this.info.NoLocal = noLocal;
this.info.DispatchAsync = session.DispatchAsync;
this.info.Retroactive = session.Retroactive;
this.info.Exclusive = session.Exclusive;
this.info.Priority = session.Priority;
this.info.AckMode = session.AcknowledgementMode;

// If the destination contained a URI query, then use it to set public properties
// on the ConsumerInfo
if(destination.Options != null)
{
// Get options prefixed with "consumer.*"
StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
// Extract out custom extension options "consumer.nms.*"
StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");

URISupport.SetProperties(this.info, options);
URISupport.SetProperties(this, customConsumerOptions, "nms.");
}
}

~MessageConsumer()
{
@@ -79,7 +111,12 @@ public ConsumerId ConsumerId
get { return info.ConsumerId; }
}

public int PrefetchSize
public ConsumerInfo ConsumerInfo
{
get { return this.info; }
}

public int PrefetchSize
{
get { return this.info.PrefetchSize; }
}
@@ -90,18 +127,26 @@ public IRedeliveryPolicy RedeliveryPolicy
set { this.redeliveryPolicy = value; }
}

private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
{
get { return this.consumerTransformer; }
set { this.consumerTransformer = value; }
}
// Custom Options
private bool ignoreExpiration = false;
public bool IgnoreExpiration
{
get { return ignoreExpiration; }
set { ignoreExpiration = value; }
}

#endregion
#endregion

#region IMessageConsumer Members

public event MessageListener Listener
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
{
get { return this.consumerTransformer; }
set { this.consumerTransformer = value; }
}

public event MessageListener Listener
{
add
{
@@ -424,7 +469,7 @@ public void Dispatch(MessageDispatch dispatch)

try
{
bool expired = message.IsExpired();
bool expired = (!IgnoreExpiration && message.IsExpired());

if(!expired)
{
@@ -548,7 +593,7 @@ private MessageDispatch Dequeue(TimeSpan timeout)
{
return null;
}
else if(dispatch.Message.IsExpired())
else if(!IgnoreExpiration && dispatch.Message.IsExpired())
{
Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);

@@ -416,40 +416,47 @@ public IMessageConsumer CreateConsumer(IDestination destination, string selector
throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
}

ConsumerInfo command = CreateConsumerInfo(destination, selector);
command.NoLocal = noLocal;
ConsumerId consumerId = command.ConsumerId;
int prefetchSize = this.Connection.PrefetchPolicy.DurableTopicPrefetch;

if(destination.IsTopic)
{
prefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
}
else if(destination.IsQueue)
{
prefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
}

MessageConsumer consumer = null;

// Registered with Connection before we register at the broker.
connection.addDispatcher(consumerId, this);

try
{
consumer = new MessageConsumer(this, command);
Destination dest = destination as Destination;
consumer = new MessageConsumer(this, GetNextConsumerId(), dest, null, selector, prefetchSize, noLocal);
consumer.ConsumerTransformer = this.ConsumerTransformer;
consumers[consumerId] = consumer;
this.AddConsumer(consumer);

// lets register the consumer first in case we start dispatching messages immediately
this.Connection.SyncRequest(consumer.ConsumerInfo);

if(this.Started)
if(this.Started)
{
consumer.Start();
}

// lets register the consumer first in case we start dispatching messages immediately
this.Connection.SyncRequest(command);

return consumer;
}
catch(Exception)
{
if(consumer != null)
{
this.RemoveConsumer(consumer);
consumer.Close();
}

throw;
}
}

return consumer;
}

public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
{
@@ -458,33 +465,26 @@ public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, s
throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
}

ConsumerInfo command = CreateConsumerInfo(destination, selector);
ConsumerId consumerId = command.ConsumerId;
command.SubscriptionName = name;
command.NoLocal = noLocal;
command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch;
MessageConsumer consumer = null;

// Registered with Connection before we register at the broker.
connection.addDispatcher(consumerId, this);

try
{
consumer = new MessageConsumer(this, command);
consumer.ConsumerTransformer = this.ConsumerTransformer;
consumers[consumerId] = consumer;
Destination dest = destination as Destination;
consumer = new MessageConsumer(this, GetNextConsumerId(), dest, name, selector, this.connection.PrefetchPolicy.DurableTopicPrefetch, noLocal);
consumer.ConsumerTransformer = this.ConsumerTransformer;
this.AddConsumer(consumer);
this.connection.SyncRequest(consumer.ConsumerInfo);

if(this.Started)
if(this.Started)
{
consumer.Start();
}

this.connection.SyncRequest(command);
}
catch(Exception)
{
if(consumer != null)
{
this.RemoveConsumer(consumer);
consumer.Close();
}

@@ -633,7 +633,26 @@ public void Recover()

#endregion

public void DoSend( Message message, MessageProducer producer, TimeSpan sendTimeout )
public void AddConsumer(MessageConsumer consumer)
{
if(!this.closing)
{
// Registered with Connection before we register at the broker.
consumers[consumer.ConsumerId] = consumer;
connection.addDispatcher(consumer.ConsumerId, this);
}
}

public void RemoveConsumer(MessageConsumer consumer)
{
connection.removeDispatcher(consumer.ConsumerId);
if(!this.closing)
{
consumers.Remove(consumer.ConsumerId);
}
}

public void DoSend(Message message, MessageProducer producer, TimeSpan sendTimeout)
{
Message msg = message;

@@ -699,52 +718,10 @@ public void DisposeOf(ProducerId objectId)
}
}

protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
{
ConsumerInfo answer = new ConsumerInfo();
ConsumerId id = new ConsumerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
id.Value = Interlocked.Increment(ref consumerCounter);
answer.ConsumerId = id;
answer.Destination = Destination.Transform(destination);
answer.Selector = selector;
answer.Priority = this.Priority;
answer.Exclusive = this.Exclusive;
answer.DispatchAsync = this.DispatchAsync;
answer.Retroactive = this.Retroactive;
answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit;
answer.AckMode = this.AcknowledgementMode;

if(destination is ITopic || destination is ITemporaryTopic)
{
answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
}
else if(destination is IQueue || destination is ITemporaryQueue)
{
answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
}

// If the destination contained a URI query, then use it to set public properties
// on the ConsumerInfo
Destination amqDestination = destination as Destination;
if(amqDestination != null && amqDestination.Options != null)
{
StringDictionary options = URISupport.GetProperties(amqDestination.Options, "consumer.");
URISupport.SetProperties(answer, options);
}

return answer;
}

protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
{
ProducerInfo answer = new ProducerInfo();
ProducerId id = new ProducerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
id.Value = Interlocked.Increment(ref producerCounter);
answer.ProducerId = id;
answer.ProducerId = GetNextProducerId();
answer.Destination = Destination.Transform(destination);

// If the destination contained a URI query, then use it to set public
@@ -759,7 +736,27 @@ protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
return answer;
}

public void Stop()
public ConsumerId GetNextConsumerId()
{
ConsumerId id = new ConsumerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
id.Value = Interlocked.Increment(ref consumerCounter);

return id;
}

public ProducerId GetNextProducerId()
{
ProducerId id = new ProducerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
id.Value = Interlocked.Increment(ref producerCounter);

return id;
}

public void Stop()
{
if(this.executor != null)
{

0 comments on commit 31b93ce

Please sign in to comment.