Skip to content
Permalink
Browse files
Add IDisposable interface to IDestination.
Fixes [AMQNET-473]. (See https://issues.apache.org/jira/browse/AMQNET-473)
Complete provider implementation for ZeroMQ.
Fixes [AMQNET-333]. (See https://issues.apache.org/jira/browse/AMQNET-333)
  • Loading branch information
Jim Gomes committed Mar 12, 2014
1 parent bb89848 commit 41507472e301434a5ac7bfbd938d7ee39a1081e4
Showing 9 changed files with 310 additions and 187 deletions.
@@ -34,6 +34,8 @@ public abstract class Destination : IDestination
protected ZmqSocket consumerEndpoint = null;
protected string destinationName;

private bool disposed = false;

/// <summary>
/// Construct the Destination with a defined physical name.
/// </summary>
@@ -46,16 +48,58 @@ protected Destination(Session session, string destName)

~Destination()
{
// TODO: Implement IDisposable pattern
Dispose(false);
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

private void Dispose(bool disposing)
{
if(disposed)
{
return;
}

if(disposing)
{
try
{
OnDispose();
}
catch(Exception ex)
{
Tracer.ErrorFormat("Exception disposing Destination {0}: {1}", this.Name, ex.Message);
}
}

disposed = true;
}

/// <summary>
/// Child classes can override this method to perform clean-up logic.
/// </summary>
protected virtual void OnDispose()
{
if(null != this.producerEndpoint)
{
this.session.Connection.ReleaseProducer(this.producerEndpoint);
if(null != this.session
&& null != this.session.Connection)
{
this.session.Connection.ReleaseProducer(this.producerEndpoint);
}

this.producerEndpoint.Dispose();
this.producerEndpoint = null;
}

if(null != this.consumerEndpoint)
{
this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
this.consumerEndpoint.Dispose();
this.consumerEndpoint = null;
}
}
@@ -69,52 +113,26 @@ public bool IsTopic
{
get
{
return DestinationType == DestinationType.Topic
|| DestinationType == DestinationType.TemporaryTopic;
return this.DestinationType == DestinationType.Topic
|| this.DestinationType == DestinationType.TemporaryTopic;
}
}

public bool IsQueue
{
get
{
return DestinationType == DestinationType.Queue
|| DestinationType == DestinationType.TemporaryQueue;
return this.DestinationType == DestinationType.Queue
|| this.DestinationType == DestinationType.TemporaryQueue;
}
}

public bool IsTemporary
{
get
{
return DestinationType == DestinationType.TemporaryQueue
|| DestinationType == DestinationType.TemporaryTopic;
}
}

/// <summary>
/// </summary>
/// <returns>string representation of this instance</returns>
public override string ToString()
{
return MakeUriString(this.destinationName);
}

private string MakeUriString(string destName)
{
switch(DestinationType)
{
case DestinationType.Topic:
return "topic://" + destName;

case DestinationType.TemporaryTopic:
return "temp-topic://" + destName;

case DestinationType.TemporaryQueue:
return "temp-queue://" + destName;

default:
return "queue://" + destName;
return this.DestinationType == DestinationType.TemporaryQueue
|| this.DestinationType == DestinationType.TemporaryTopic;
}
}

@@ -49,35 +49,35 @@ public ConsumerTransformerDelegate ConsumerTransformer
set { this.consumerTransformer = value; }
}

public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination dest, string selector)
public MessageConsumer(Session sess, AcknowledgementMode ackMode, IDestination dest, string selector)
{
// UNUSED_PARAM(selector); // Selectors are not currently supported

if(null == session.Connection.Context)
if(null == sess.Connection.Context)
{
throw new NMSConnectionException();
}

this.session = session;
this.session = sess;
this.destination = (Destination) dest;
this.acknowledgementMode = acknowledgementMode;
this.acknowledgementMode = ackMode;
}

public event MessageListener Listener
{
add
{
listener += value;
listenerCount++;
this.listener += value;
this.listenerCount++;
StartAsyncDelivery();
}

remove
{
if(listenerCount > 0)
if(this.listenerCount > 0)
{
listener -= value;
listenerCount--;
this.listener -= value;
this.listenerCount--;
}

if(0 == listenerCount)
@@ -152,39 +152,39 @@ protected virtual void StopAsyncDelivery()
{
lock(asyncDeliveryLock)
{
asyncDelivery = false;
if(null != asyncDeliveryThread)
this.asyncDelivery = false;
if(null != this.asyncDeliveryThread)
{
Tracer.Info("Stopping async delivery thread.");
asyncDeliveryThread.Interrupt();
if(!asyncDeliveryThread.Join(10000))
this.asyncDeliveryThread.Interrupt();
if(!this.asyncDeliveryThread.Join(10000))
{
Tracer.Info("Aborting async delivery thread.");
asyncDeliveryThread.Abort();
this.asyncDeliveryThread.Abort();
}

asyncDeliveryThread = null;
this.asyncDeliveryThread = null;
Tracer.Info("Async delivery thread stopped.");
}
}
}

protected virtual void StartAsyncDelivery()
{
Debug.Assert(null == asyncDeliveryThread);
lock(asyncDeliveryLock)
Debug.Assert(null == this.asyncDeliveryThread);
lock(this.asyncDeliveryLock)
{
asyncDelivery = true;
asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
asyncDeliveryThread.Name = string.Format("MsgConsumerAsync: {0}", this.destination.Name);
asyncDeliveryThread.IsBackground = true;
asyncDeliveryThread.Start();
this.asyncDelivery = true;
this.asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
this.asyncDeliveryThread.Name = string.Format("MsgConsumerAsync: {0}", this.destination.Name);
this.asyncDeliveryThread.IsBackground = true;
this.asyncDeliveryThread.Start();
}
}

protected virtual void DispatchLoop()
{
Tracer.Info("Starting dispatcher thread consumer: " + this);
Tracer.InfoFormat("Starting dispatcher thread consumer: {0}", this.asyncDeliveryThread.Name);
TimeSpan receiveWait = TimeSpan.FromSeconds(3);

while(asyncDelivery)
@@ -214,12 +214,12 @@ protected virtual void DispatchLoop()
Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
}
}
Tracer.Info("Stopped dispatcher thread consumer: " + this);
Tracer.InfoFormat("Stopped dispatcher thread consumer: {0}", this.asyncDeliveryThread.Name);
}

protected virtual void HandleAsyncException(Exception e)
{
session.Connection.HandleException(e);
this.session.Connection.HandleException(e);
}

/// <summary>

0 comments on commit 4150747

Please sign in to comment.