Skip to content
Permalink
Browse files
Refactor the CheckConnected function to handle multiple threads attem…
…pting to check connection status against an offline broker. Guard against unwanted exceptions being thrown when indexing into a connection state array that has not been fully set up because the broker is offline.

Fixes [AMQNET-331]. (See https://issues.apache.org/jira/browse/AMQNET-331)
  • Loading branch information
Jim Gomes committed Jun 18, 2011
1 parent 998db3e commit f329615b24affe538afcde0eee005ce4fa3b3ae8
Showing 5 changed files with 103 additions and 47 deletions.
@@ -501,14 +501,7 @@ protected void Dispose(bool disposing)

public Response SyncRequest(Command command)
{
try
{
return SyncRequest(command, this.RequestTimeout);
}
catch(Exception ex)
{
throw NMSExceptionSupport.Create(ex);
}
return SyncRequest(command, this.RequestTimeout);
}

public Response SyncRequest(Command command, TimeSpan requestTimeout)
@@ -546,7 +539,13 @@ public void Oneway(Command command)
}
}

protected void CheckConnected()
private object connectedLock = new object();

/// <summary>
/// Check and ensure that the connection objcet is connected. If it is not
/// connected or is closed, a ConnectionClosedException is thrown.
/// </summary>
internal void CheckConnected()
{
if(closed.Value)
{
@@ -555,17 +554,61 @@ protected void CheckConnected()

if(!connected.Value)
{
if(!this.userSpecifiedClientID)
DateTime timeoutTime = DateTime.Now + this.RequestTimeout;
int waitCount = 1;

while(true)
{
this.info.ClientId = this.clientIdGenerator.GenerateId();
if(Monitor.TryEnter(connectedLock))
{
try
{
if(closed.Value || closing.Value)
{
break;
}
else if(!connected.Value)
{
if(!this.userSpecifiedClientID)
{
this.info.ClientId = this.clientIdGenerator.GenerateId();
}

try
{
if(null != transport)
{
// Send the connection and see if an ack/nak is returned.
Response response = transport.Request(this.info, this.RequestTimeout);
if(!(response is ExceptionResponse))
{
connected.Value = true;
}
}
}
catch
{
}
}
}
finally
{
Monitor.Exit(connectedLock);
}
}

if(connected.Value || closed.Value || closing.Value || DateTime.Now > timeoutTime)
{
break;
}

// Back off from being overly aggressive. Having too many threads
// aggressively trying to connect to a down broker pegs the CPU.
Thread.Sleep(5 * (waitCount++));
}

connected.Value = true;
// now lets send the connection and see if we get an ack/nak
if(null == SyncRequest(info))
if(!connected.Value)
{
closed.Value = true;
connected.Value = false;
throw new ConnectionClosedException();
}
}
@@ -25,8 +25,7 @@ public class ConnectionState
{

ConnectionInfo info;
private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers =
new AtomicDictionary<ConsumerId, ConsumerState>();
private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers = new AtomicDictionary<ConsumerId, ConsumerState>();
private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);

public ConnectionState(ConnectionInfo info)
@@ -49,26 +48,25 @@ public void reset(ConnectionInfo info)
{
get
{
#if DEBUG
try
ConsumerState consumerState;

if(consumers.TryGetValue(id, out consumerState))
{
#endif
return consumers[id];
#if DEBUG
return consumerState;
}
catch(System.Collections.Generic.KeyNotFoundException ex)

#if DEBUG
// Useful for dignosing missing consumer ids
string consumerList = string.Empty;
foreach(ConsumerId consumerId in consumers.Keys)
{
// Useful for dignosing missing consumer ids
string consumerList = string.Empty;
foreach(ConsumerId consumerId in consumers.Keys)
{
consumerList += consumerId.ToString() + "\n";
}
System.Diagnostics.Debug.Assert(false,
string.Format("Consumer '{0}' did not exist in the consumers collection.\n\nConsumers:-\n{1}", id, consumerList));
throw ex;
consumerList += consumerId.ToString() + "\n";
}
#endif

System.Diagnostics.Debug.Assert(false,
string.Format("Consumer '{0}' did not exist in the consumers collection.\n\nConsumers:-\n{1}", id, consumerList));
#endif
return null;
}
}

@@ -80,7 +78,9 @@ public void addConsumer(ConsumerInfo info)

public ConsumerState removeConsumer(ConsumerId id)
{
ConsumerState ret = consumers[id];
ConsumerState ret = null;

consumers.TryGetValue(id, out ret);
consumers.Remove(id);
return ret;
}
@@ -31,8 +31,7 @@ public class ConnectionStateTracker : CommandVisitorAdapter
{
private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);

protected Dictionary<ConnectionId, ConnectionState> connectionStates =
new Dictionary<ConnectionId, ConnectionState>();
protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();

private bool _restoreConsumers = true;

@@ -67,10 +66,10 @@ public void DoRestore(ITransport transport)
{
transport.Oneway(connectionState.Info);

if(RestoreConsumers)
{
DoRestoreConsumers(transport, connectionState);
}
if(RestoreConsumers)
{
DoRestoreConsumers(transport, connectionState);
}
}
}

@@ -97,10 +96,11 @@ public override Response processAddConsumer(ConsumerInfo info)
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
ConnectionState cs = null;

if(connectionStates.TryGetValue(connectionId, out cs))
{
cs.addConsumer(info);
cs.addConsumer(info);
}
}
}
@@ -118,8 +118,9 @@ public override Response processRemoveConsumer(ConsumerId id)
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
ConnectionState cs = null;

if(connectionStates.TryGetValue(connectionId, out cs))
{
cs.removeConsumer(id);
}
@@ -177,6 +177,14 @@ public void Clear()
}
}

public bool TryGetValue(TKey key, out TValue val)
{
lock(((ICollection) _dictionary).SyncRoot)
{
return _dictionary.TryGetValue(key, out val);
}
}

public AtomicCollection<TKey> Keys
{
get
@@ -20,6 +20,7 @@

namespace Apache.NMS.Stomp.Transport
{
/// <summary>
/// A Transport which guards access to the next transport using a mutex.
/// </summary>
public class MutexTransport : TransportFilter
@@ -31,6 +32,7 @@ private void GetTransmissionLock(int timeout)
if(timeout > 0)
{
DateTime timeoutTime = DateTime.Now + TimeSpan.FromMilliseconds(timeout);
int waitCount = 1;

while(true)
{
@@ -44,7 +46,9 @@ private void GetTransmissionLock(int timeout)
throw new IOException(string.Format("Oneway timed out after {0} milliseconds.", timeout));
}

Thread.Sleep(10);
// Back off from being overly aggressive. Having too many threads
// aggressively trying to get the lock pegs the CPU.
Thread.Sleep(3 * (waitCount++));
}
}
else

0 comments on commit f329615

Please sign in to comment.