Skip to content
Permalink
Browse files
Add support for transport timeout while waiting for mutex lock.
  • Loading branch information
Jim Gomes committed Jun 14, 2011
1 parent ea5f43c commit 998db3e2e7aceb716c2a5f3a41fcbcb05b5b6a70
Showing 6 changed files with 132 additions and 9 deletions.
@@ -55,6 +55,7 @@ public class FailoverTransport : ICompositeTransport, IComparable
private bool started;

private int timeout = -1;
private int asyncTimeout = 45000;
private int initialReconnectDelay = 10;
private int maxReconnectDelay = 1000 * 30;
private int backOffMultiplier = 2;
@@ -217,7 +218,17 @@ public bool UseExponentialBackOff

#endregion

public bool IsFaultTolerant
/// <summary>
/// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made
/// </summary>
/// <value>The async timeout.</value>
public int AsyncTimeout
{
get { return asyncTimeout; }
set { asyncTimeout = value; }
}

public bool IsFaultTolerant
{
get { return true; }
}
@@ -70,6 +70,26 @@ public interface ITransport : IStartable, IDisposable, IStoppable
/// </summary>
Object Narrow(Type type);

/// <summary>
/// Timeout in milliseconds to wait for sending synchronous messages or commands.
/// Set to -1 for infinite timeout.
/// </summary>
int Timeout
{
get;
set;
}

/// <summary>
/// Timeout in milliseconds to wait for sending asynchronous messages or commands.
/// Set to -1 for infinite timeout.
/// </summary>
int AsyncTimeout
{
get;
set;
}

CommandHandler Command
{
get;
@@ -14,44 +14,86 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using Apache.NMS.Stomp.Commands;
using System;
using System.Threading;
using Apache.NMS.Stomp.Commands;

namespace Apache.NMS.Stomp.Transport
{
/// <summary>
/// A Transport which guards access to the next transport using a mutex.
/// </summary>
public class MutexTransport : TransportFilter
{
private readonly object transmissionLock = new object();

private void GetTransmissionLock(int timeout)
{
if(timeout > 0)
{
DateTime timeoutTime = DateTime.Now + TimeSpan.FromMilliseconds(timeout);

while(true)
{
if(Monitor.TryEnter(transmissionLock))
{
break;
}

if(DateTime.Now > timeoutTime)
{
throw new IOException(string.Format("Oneway timed out after {0} milliseconds.", timeout));
}

Thread.Sleep(10);
}
}
else
{
Monitor.Enter(transmissionLock);
}
}

public MutexTransport(ITransport next) : base(next)
{
}

public override void Oneway(Command command)
{
lock(transmissionLock)
GetTransmissionLock(this.next.Timeout);
try
{
base.Oneway(command);
}
finally
{
this.next.Oneway(command);
Monitor.Exit(transmissionLock);
}
}

public override FutureResponse AsyncRequest(Command command)
{
lock(transmissionLock)
GetTransmissionLock(this.next.AsyncTimeout);
try
{
return base.AsyncRequest(command);
}
finally
{
Monitor.Exit(transmissionLock);
}
}

public override Response Request(Command command, TimeSpan timeout)
{
lock(transmissionLock)
GetTransmissionLock((int) timeout.TotalMilliseconds);
try
{
return base.Request(command, timeout);
}
finally
{
Monitor.Exit(transmissionLock);
}
}
}
}
@@ -40,6 +40,8 @@ public class TcpTransport : ITransport
private readonly Atomic<bool> closed = new Atomic<bool>(false);
private volatile bool seenShutdown;
private readonly Uri connectedUri;
private int timeout = -1;
private int asynctimeout = -1;

private CommandHandler commandHandler;
private ExceptionHandler exceptionHandler;
@@ -314,7 +316,27 @@ public void ReadLoop()

// Implementation methods

public CommandHandler Command
/// <summary>
/// Timeout in milliseconds to wait for sending synchronous messages or commands.
/// Set to -1 for infinite timeout.
/// </summary>
public int Timeout
{
get { return this.timeout; }
set { this.timeout = value; }
}

/// <summary>
/// Timeout in milliseconds to wait for sending asynchronous messages or commands.
/// Set to -1 for infinite timeout.
/// </summary>
public int AsyncTimeout
{
get { return this.asynctimeout; }
set { this.asynctimeout = value; }
}

public CommandHandler Command
{
get { return commandHandler; }
set { this.commandHandler = value; }
@@ -206,6 +206,26 @@ public Object Narrow(Type type)
return null;
}

/// <summary>
/// Timeout in milliseconds to wait for sending synchronous messages or commands.
/// Set to -1 for infinite timeout.
/// </summary>
public int Timeout
{
get { return next.Timeout; }
set { next.Timeout = value; }
}

/// <summary>
/// Timeout in milliseconds to wait for sending asynchronous messages or commands.
/// Set to -1 for infinite timeout.
/// </summary>
public int AsyncTimeout
{
get { return next.AsyncTimeout; }
set { next.AsyncTimeout = value; }
}

public bool IsFaultTolerant
{
get{ return next.IsFaultTolerant; }
@@ -3,7 +3,7 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>9.0.21022</ProductVersion>
<ProductVersion>9.0.30729</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{E8C995C3-FF81-491B-A3B7-9D7C753BDDC3}</ProjectGuid>
<OutputType>Library</OutputType>
@@ -55,6 +55,14 @@
<NoWarn>3016</NoWarn>
</PropertyGroup>
<ItemGroup>
<Reference Include="Apache.NMS, Version=1.5.1.2341, Culture=neutral, PublicKeyToken=82756feee3957618, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>lib\Apache.NMS\net-2.0\Apache.NMS.dll</HintPath>
</Reference>
<Reference Include="Apache.NMS.Test, Version=1.5.1.2341, Culture=neutral, PublicKeyToken=82756feee3957618, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>lib\Apache.NMS\net-2.0\Apache.NMS.Test.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Xml" />
<Reference Include="nunit.framework, Version=2.5.8.10295, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77">

0 comments on commit 998db3e

Please sign in to comment.