Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'TimeoutMessageBugfix' of https://github.com/RyanHauert/…

…rhino-esb into rhauert
  • Loading branch information...
commit b198c6b4b2c8885b0ad3434b0f3e02464cdf7842 2 parents 57fcff5 + d3b99ac
@CoreyKaylor CoreyKaylor authored
View
28 Rhino.ServiceBus.Tests/MessageModuleTests.cs
@@ -132,20 +132,44 @@ public void Can_register_to_get_message_completion_notification()
Assert.True(Module1.Completion);
}
+ [Fact]
+ public void Can_register_to_get_transaction_commit_notification()
+ {
+ using (var serviceBus = (DefaultServiceBus)container.Resolve<IServiceBus>())
+ {
+ serviceBus.Start();
+ Module1.TransactionCommitResetEvent = new ManualResetEvent(false);
+ Module1.TransactionCommit = false;
+
+ serviceBus.Send(serviceBus.Endpoint, "transaction");
+ Module1.TransactionCommitResetEvent.WaitOne(TimeSpan.FromSeconds(30), false);
+ }
+ Assert.True(Module1.TransactionCommit);
+ }
+
public class Module1 : IMessageModule
{
public static Exception Exception;
public static ManualResetEvent ErrorResetEvent;
public static ManualResetEvent CompletionResetEvent;
+ public static ManualResetEvent TransactionCommitResetEvent;
public static bool Completion = true;
+ public static bool TransactionCommit;
- public void Init(ITransport transport, IServiceBus bus)
+ public void Init(ITransport transport, IServiceBus bus)
{
+ transport.BeforeMessageTransactionCommit += Transport_BeforeMessageTransactionCommit;
transport.MessageProcessingFailure+=Transport_OnMessageProcessingFailure;
transport.MessageProcessingCompleted+=Transport_OnMessageProcessingCompleted;
}
- private static void Transport_OnMessageProcessingCompleted(CurrentMessageInformation t, Exception e)
+ private void Transport_BeforeMessageTransactionCommit(CurrentMessageInformation obj)
+ {
+ TransactionCommit = true;
+ TransactionCommitResetEvent.Set();
+ }
+
+ private static void Transport_OnMessageProcessingCompleted(CurrentMessageInformation t, Exception e)
{
Completion = true;
CompletionResetEvent.Set();
View
1  Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj
@@ -186,6 +186,7 @@
<Compile Include="RhinoQueues\UsingRhinoQueuesBusWithAlternateStorageLocation.cs" />
<Compile Include="RhinoQueues\UsingRhinoQueuesTransport.cs" />
<Compile Include="RhinoQueues\WhenErrorOccurs.cs" />
+ <Compile Include="RhinoQueues\WhenReceivingTimedMessage.cs" />
<Compile Include="RhinoQueues\WithDebugging.cs" />
<Compile Include="SagaFinderTests.cs" />
<Compile Include="SagaTests.cs" />
View
89 Rhino.ServiceBus.Tests/RhinoQueues/WhenReceivingTimedMessage.cs
@@ -0,0 +1,89 @@
+using System;
+using System.IO;
+using System.Threading;
+using System.Transactions;
+using Castle.Windsor;
+using Rhino.ServiceBus.Castle;
+using Rhino.ServiceBus.Impl;
+using Rhino.ServiceBus.RhinoQueues;
+using Rhino.ServiceBus.Serializers;
+using Xunit;
+
+namespace Rhino.ServiceBus.Tests.RhinoQueues
+{
+ public class WhenReceivingTimedMessage : IDisposable
+ {
+ private readonly RhinoQueuesTransport transport;
+ private readonly ManualResetEvent wait = new ManualResetEvent(false);
+ private readonly XmlMessageSerializer messageSerializer;
+
+ public WhenReceivingTimedMessage()
+ {
+ if (Directory.Exists("test.esent"))
+ Directory.Delete("test.esent", true);
+
+ messageSerializer = new XmlMessageSerializer(new DefaultReflection(),
+ new CastleServiceLocator(new WindsorContainer()));
+ transport = new RhinoQueuesTransport(
+ new Uri("rhino.queues://localhost:23456/q"),
+ new EndpointRouter(),
+ messageSerializer,
+ 1,
+ "test.esent",
+ IsolationLevel.Serializable,
+ 5,
+ false,
+ new RhinoQueuesMessageBuilder(messageSerializer)
+ );
+ transport.Start();
+ }
+
+ [Fact]
+ public void Raises_message_arrived()
+ {
+ transport.MessageArrived += information =>
+ {
+ wait.Set();
+ return true;
+ };
+
+ bool signaled = SendTimedMessage();
+ Assert.True(signaled);
+ }
+
+ [Fact]
+ private void Raises_message_processing_completed()
+ {
+ transport.MessageProcessingCompleted += (information, ex) => wait.Set();
+
+ bool signaled = SendTimedMessage();
+ Assert.True(signaled);
+ }
+
+ [Fact]
+ public void Raises_before_message_transaction_commit()
+ {
+ transport.BeforeMessageTransactionCommit += information => wait.Set();
+
+ bool signaled = SendTimedMessage();
+ Assert.True(signaled);
+ }
+
+ public void Dispose()
+ {
+ transport.Dispose();
+ wait.Close();
+ }
+
+ private bool SendTimedMessage()
+ {
+ using (var tx = new TransactionScope())
+ {
+ transport.Send(transport.Endpoint, DateTime.Now.AddSeconds(3), new object[] { "test" });
+ tx.Complete();
+ }
+
+ return wait.WaitOne(TimeSpan.FromSeconds(5), false);
+ }
+ }
+}
View
2  Rhino.ServiceBus/Rhino.ServiceBus.csproj
@@ -26,7 +26,7 @@
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
- <NoWarn>1607</NoWarn>
+ <NoWarn>1607,1591</NoWarn>
<DocumentationFile>bin\Debug\Rhino.ServiceBus.xml</DocumentationFile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
View
2  Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs
@@ -335,7 +335,7 @@ private void ReceiveMessage(object context)
ProcessMessage(message, tx,
MessageArrived,
MessageProcessingCompleted,
- null);
+ BeforeMessageTransactionCommit);
}
break;
default:

0 comments on commit b198c6b

Please sign in to comment.
Something went wrong with that request. Please try again.