From 15092e92c4a970cb9c9b07474447f36bbff18e85 Mon Sep 17 00:00:00 2001 From: andreasohlund Date: Sun, 24 Jan 2016 12:09:27 +0700 Subject: [PATCH] Outbox now handles TTBR properly Making sure that outbox stores TTBR in options --- .../NServiceBus.AcceptanceTests.csproj | 1 + .../When_sending_a_message_with_a_ttbr.cs | 130 ++++++++++++++++++ .../Outbox/OutboxDeduplicationBehavior.cs | 15 +- .../Outbox/OutboxSendBehavior.cs | 10 +- 4 files changed, 151 insertions(+), 5 deletions(-) create mode 100644 src/NServiceBus.AcceptanceTests/NonDTC/When_sending_a_message_with_a_ttbr.cs diff --git a/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj b/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj index 0d5a22ccab..31e4ee13e9 100644 --- a/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj +++ b/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj @@ -92,6 +92,7 @@ + diff --git a/src/NServiceBus.AcceptanceTests/NonDTC/When_sending_a_message_with_a_ttbr.cs b/src/NServiceBus.AcceptanceTests/NonDTC/When_sending_a_message_with_a_ttbr.cs new file mode 100644 index 0000000000..f27bd8f173 --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/NonDTC/When_sending_a_message_with_a_ttbr.cs @@ -0,0 +1,130 @@ +namespace NServiceBus.AcceptanceTests.NonDTC +{ + using System; + using NServiceBus.AcceptanceTesting; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NServiceBus.Configuration.AdvanceExtensibility; + using NServiceBus.Features; + using NServiceBus.ObjectBuilder; + using NServiceBus.Transports; + using NServiceBus.Unicast; + using NUnit.Framework; + + public class When_sending_a_message_with_a_ttbr : NServiceBusAcceptanceTest + { + [Test] + public void Should_honor_it() + { + var context = new Context(); + + Scenario.Define(context) + .WithEndpoint(b => b.Given((bus, c) => bus.SendLocal(new StartMessage()))) + .Done(c => c.WasCalled) + .Run(); + + Assert.AreEqual(TimeSpan.Parse("00:00:10"), context.TTBRUsed); + } + + public class Context : ScenarioContext + { + public bool WasCalled { get; set; } + public TimeSpan TTBRUsed { get; set; } + } + + public class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() + { + EndpointSetup( + b => + { + b.GetSettings().Set("DisableOutboxTransportCheck", true); + b.EnableOutbox(); + }); + } + + public class MyMessageHandler : IHandleMessages + { + public Context Context { get; set; } + + public IBus Bus { get; set; } + + public void Handle(MyMessage message) + { + Context.WasCalled = true; + } + } + + public class StartMessageHandler : IHandleMessages + { + public Context Context { get; set; } + + public IBus Bus { get; set; } + + public void Handle(StartMessage message) + { + Bus.SendLocal(new MyMessage()) +; + } + } + + public class DispatcherInterceptor : Feature + { + public DispatcherInterceptor() + { + EnableByDefault(); + DependsOn(); + } + + protected override void Setup(FeatureConfigurationContext context) + { + var originalDispatcher = EndpointConfiguration.builder.Build(); + var ctx = EndpointConfiguration.builder.Build(); + context.Container.ConfigureComponent(() => new SenderWrapper(originalDispatcher, ctx), DependencyLifecycle.SingleInstance); + } + } + + public class EndpointConfiguration : IWantToRunBeforeConfigurationIsFinalized + { + public static IBuilder builder; + + public void Run(Configure config) + { + builder = config.Builder; + } + } + + + class SenderWrapper : ISendMessages + { + public SenderWrapper(ISendMessages wrappedSender, Context context) + { + this.wrappedSender = wrappedSender; + this.context = context; + } + + public void Send(TransportMessage message, SendOptions sendOptions) + { + if (message.Headers[Headers.EnclosedMessageTypes].Contains("MyMessage")) + { + context.TTBRUsed = message.TimeToBeReceived; + } + + wrappedSender.Send(message, sendOptions); + } + + Context context; + ISendMessages wrappedSender; + } + } + + [TimeToBeReceived("00:00:10")] + public class MyMessage : IMessage + { + } + + public class StartMessage : IMessage + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Outbox/OutboxDeduplicationBehavior.cs b/src/NServiceBus.Core/Outbox/OutboxDeduplicationBehavior.cs index 3a0978ba87..24c710dca4 100644 --- a/src/NServiceBus.Core/Outbox/OutboxDeduplicationBehavior.cs +++ b/src/NServiceBus.Core/Outbox/OutboxDeduplicationBehavior.cs @@ -34,12 +34,12 @@ public void Invoke(IncomingContext context, Action next) context.Set(outboxMessage); //we use this scope to make sure that we escalate to DTC if the user is talking to another resource by misstake - using (var checkForEscalationScope = new TransactionScope(TransactionScopeOption.RequiresNew,new TransactionOptions{IsolationLevel = TransactionSettings.IsolationLevel,Timeout = TransactionSettings.TransactionTimeout})) + using (var checkForEscalationScope = new TransactionScope(TransactionScopeOption.RequiresNew, new TransactionOptions { IsolationLevel = TransactionSettings.IsolationLevel, Timeout = TransactionSettings.TransactionTimeout })) { next(); checkForEscalationScope.Complete(); } - + if (context.handleCurrentMessageLaterWasCalled) { @@ -65,17 +65,24 @@ void DispatchOperationToTransport(IEnumerable operations) Body = transportOperation.Body }; + string ttbr; + + if (transportOperation.Options.TryGetValue("TimeToBeReceived", out ttbr)) + { + message.TimeToBeReceived = TimeSpan.Parse(ttbr); + } + //dispatch to transport if (transportOperation.Options["Operation"] != "Audit") { - DispatchMessageToTransportBehavior.InvokeNative(deliveryOptions, message); + DispatchMessageToTransportBehavior.InvokeNative(deliveryOptions, message); } else { DefaultMessageAuditer.Audit(deliveryOptions as SendOptions, message); } - + } } diff --git a/src/NServiceBus.Core/Outbox/OutboxSendBehavior.cs b/src/NServiceBus.Core/Outbox/OutboxSendBehavior.cs index 9e71ddc9cd..e2fa93aaf6 100644 --- a/src/NServiceBus.Core/Outbox/OutboxSendBehavior.cs +++ b/src/NServiceBus.Core/Outbox/OutboxSendBehavior.cs @@ -15,7 +15,15 @@ public void Invoke(OutgoingContext context, Action next) if (context.TryGet(out currentOutboxMessage)) { - currentOutboxMessage.TransportOperations.Add( new TransportOperation(context.OutgoingMessage.Id, context.DeliveryOptions.ToTransportOperationOptions(), context.OutgoingMessage.Body, context.OutgoingMessage.Headers)); + var options = context.DeliveryOptions.ToTransportOperationOptions(); + var transportOperation = new TransportOperation(context.OutgoingMessage.Id, options, context.OutgoingMessage.Body, context.OutgoingMessage.Headers); + + if (context.OutgoingMessage.TimeToBeReceived != TimeSpan.MaxValue) + { + options["TimeToBeReceived"] = context.OutgoingMessage.TimeToBeReceived.ToString(); + } + + currentOutboxMessage.TransportOperations.Add(transportOperation); } else {