Skip to content
Permalink
Browse files
  • Loading branch information
Timothy A. Bish committed Jan 26, 2011
1 parent dc3990f commit 13eab13dec1bb952a0a863a1ac4eec73d4cf761b
Showing 4 changed files with 514 additions and 43 deletions.
@@ -20,6 +20,7 @@
using System.Collections.Specialized;
using System.Threading;
using Apache.NMS.Stomp.Commands;
using Apache.NMS.Stomp.Threads;
using Apache.NMS.Stomp.Transport;
using Apache.NMS.Stomp.Util;
using Apache.NMS.Util;
@@ -52,9 +53,11 @@ public class Connection : IConnection
private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
private readonly object myLock = new object();
private bool connected = false;
private bool closed = false;
private bool closing = false;
private readonly Atomic<bool> connected = new Atomic<bool>(false);
private readonly Atomic<bool> closed = new Atomic<bool>(false);
private readonly Atomic<bool> closing = new Atomic<bool>(false);
private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
private Exception firstFailureError = null;
private int sessionCounter = 0;
private int temporaryDestinationCounter = 0;
private int localTransactionCounter;
@@ -64,6 +67,7 @@ public class Connection : IConnection
private readonly IdGenerator clientIdGenerator;
private CountDownLatch transportInterruptionProcessingComplete;
private readonly MessageTransformation messageTransformation;
private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();

public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
{
@@ -72,7 +76,7 @@ public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdG

this.transport = transport;
this.transport.Command = new CommandHandler(OnCommand);
this.transport.Exception = new ExceptionHandler(OnException);
this.transport.Exception = new ExceptionHandler(OnTransportException);
this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
this.transport.Resumed = new ResumedHandler(OnTransportResumed);

@@ -215,6 +219,16 @@ public ITransport ITransport
set { this.transport = value; }
}

public bool TransportFailed
{
get { return this.transportFailed.Value; }
}

public Exception FirstFailureError
{
get { return this.firstFailureError; }
}

public TimeSpan RequestTimeout
{
get { return this.requestTimeout; }
@@ -232,7 +246,7 @@ public string ClientId
get { return info.ClientId; }
set
{
if(this.connected)
if(this.connected.Value)
{
throw new NMSException("You cannot change the ClientId once the Connection is connected");
}
@@ -384,7 +398,7 @@ public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)

internal void RemoveSession(Session session)
{
if(!this.closing)
if(!this.closing.Value)
{
sessions.Remove(session);
}
@@ -404,15 +418,15 @@ public void Close()
{
lock(myLock)
{
if(this.closed)
if(this.closed.Value)
{
return;
}

try
{
Tracer.Info("Closing Connection.");
this.closing = true;
this.closing.Value = true;
lock(sessions.SyncRoot)
{
foreach(Session session in sessions)
@@ -422,7 +436,7 @@ public void Close()
}
sessions.Clear();

if(connected)
if(connected.Value)
{
ShutdownInfo shutdowninfo = new ShutdownInfo();
transport.Oneway(shutdowninfo);
@@ -438,9 +452,9 @@ public void Close()
finally
{
this.transport = null;
this.closed = true;
this.connected = false;
this.closing = false;
this.closed.Value = true;
this.connected.Value = false;
this.closing.Value = false;
}
}
}
@@ -534,24 +548,24 @@ public void Oneway(Command command)

protected void CheckConnected()
{
if(closed)
if(closed.Value)
{
throw new ConnectionClosedException();
}

if(!connected)
if(!connected.Value)
{
if(!this.userSpecifiedClientID)
{
this.info.ClientId = this.clientIdGenerator.GenerateId();
}

connected = true;
connected.Value = true;
// now lets send the connection and see if we get an ack/nak
if(null == SyncRequest(info))
{
closed = true;
connected = false;
closed.Value = true;
connected.Value = false;
throw new ConnectionClosedException();
}
}
@@ -581,7 +595,7 @@ protected void OnCommand(ITransport commandTransport, Command command)
}
else if(command.IsErrorCommand)
{
if(!closing && !closed)
if(!closing.Value && !closed.Value)
{
ConnectionError connectionError = (ConnectionError) command;
BrokerError brokerError = connectionError.Exception;
@@ -597,7 +611,7 @@ protected void OnCommand(ITransport commandTransport, Command command)
}
}

OnException(commandTransport, new NMSConnectionException(message, cause));
OnException(new NMSConnectionException(message, cause));
}
}
else
@@ -632,17 +646,85 @@ protected void DispatchMessage(MessageDispatch dispatch)
Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
}

protected void OnException(ITransport sender, Exception exception)
protected void OnTransportException(ITransport sender, Exception exception)
{
this.OnException(exception);
}

internal void OnAsyncException(Exception error)
{
if(!this.closed.Value && !this.closing.Value)
{
if(this.ExceptionListener != null)
{
if(!(error is NMSException))
{
error = NMSExceptionSupport.Create(error);
}
NMSException e = (NMSException)error;

// Called in another thread so that processing can continue
// here, ensures no lock contention.
executor.QueueUserWorkItem(AsyncCallExceptionListener, e);
}
else
{
Tracer.Debug("Async exception with no exception listener: " + error);
}
}
}

private void AsyncCallExceptionListener(object error)
{
NMSException exception = error as NMSException;
this.ExceptionListener(exception);
}

internal void OnException(Exception error)
{
// Will fire an exception listener callback if there's any set.
OnAsyncException(error);

if(!this.closing.Value && !this.closed.Value)
{
// Perform the actual work in another thread to avoid lock contention
// and allow the caller to continue on in its error cleanup.
executor.QueueUserWorkItem(AsyncOnExceptionHandler, error);
}
}

private void AsyncOnExceptionHandler(object error)
{
if(ExceptionListener != null && !this.closing)
Exception cause = error as Exception;

MarkTransportFailed(cause);

try
{
this.transport.Dispose();
}
catch(Exception ex)
{
Tracer.Debug("Caught Exception While disposing of Transport: " + ex);
}

IList sessionsCopy = null;
lock(this.sessions.SyncRoot)
{
sessionsCopy = new ArrayList(this.sessions);
}

// Use a copy so we don't concurrently modify the Sessions list if the
// client is closing at the same time.
foreach(Session session in sessionsCopy)
{
try
{
ExceptionListener(exception);
session.Dispose();
}
catch
catch(Exception ex)
{
sender.Dispose();
Tracer.Debug("Caught Exception While disposing of Sessions: " + ex);
}
}
}
@@ -662,7 +744,7 @@ protected void OnTransportInterrupted(ITransport sender)
session.ClearMessagesInProgress();
}

if(this.ConnectionInterruptedListener != null && !this.closing)
if(this.ConnectionInterruptedListener != null && !this.closing.Value)
{
try
{
@@ -678,7 +760,7 @@ protected void OnTransportResumed(ITransport sender)
{
Tracer.Debug("Transport has resumed normal operation.");

if(this.ConnectionResumedListener != null && !this.closing)
if(this.ConnectionResumedListener != null && !this.closing.Value)
{
try
{
@@ -705,6 +787,15 @@ internal void OnSessionException(Session sender, Exception exception)
}
}

private void MarkTransportFailed(Exception error)
{
this.transportFailed.Value = true;
if(this.firstFailureError == null)
{
this.firstFailureError = error;
}
}

/// <summary>
/// Creates a new temporary destination name
/// </summary>
@@ -739,7 +830,7 @@ private void WaitForTransportInterruptionProcessingToComplete()
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if(cdl != null)
{
if(!closed && cdl.Remaining > 0)
if(!closed.Value && cdl.Remaining > 0)
{
Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " +
"processing (" + cdl.Remaining + ") to complete..");

0 comments on commit 13eab13

Please sign in to comment.