Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/hotfix-5.2.13'
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcin Hoppe committed Jan 27, 2016
2 parents ec99bb8 + 24a1718 commit 43a114a
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 5 deletions.
Expand Up @@ -92,6 +92,7 @@
<Compile Include="Basic\When_using_callback_to_get_message.cs" />
<Compile Include="Encryption\When_using_Rijndael_without_incoming_key_identifier.cs" />
<Compile Include="HostInformation\When_feature_overrides_hostid.cs" />
<Compile Include="NonDTC\When_sending_a_message_with_a_ttbr.cs" />
<Compile Include="PerfMon\CriticalTime\When_deferring_a_message.cs" />
<Compile Include="PubSub\When_publishing_from_sendonly.cs" />
<Compile Include="PubSub\When_publishing_an_event_implementing_two_unrelated_interfaces.cs" />
Expand Down
@@ -0,0 +1,130 @@
namespace NServiceBus.AcceptanceTests.NonDTC
{
using System;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Configuration.AdvanceExtensibility;
using NServiceBus.Features;
using NServiceBus.ObjectBuilder;
using NServiceBus.Transports;
using NServiceBus.Unicast;
using NUnit.Framework;

public class When_sending_a_message_with_a_ttbr : NServiceBusAcceptanceTest
{
[Test]
public void Should_honor_it()
{
var context = new Context();

Scenario.Define(context)
.WithEndpoint<Endpoint>(b => b.Given((bus, c) => bus.SendLocal(new StartMessage())))
.Done(c => c.WasCalled)
.Run();

Assert.AreEqual(TimeSpan.Parse("00:00:10"), context.TTBRUsed);
}

public class Context : ScenarioContext
{
public bool WasCalled { get; set; }
public TimeSpan TTBRUsed { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>(
b =>
{
b.GetSettings().Set("DisableOutboxTransportCheck", true);
b.EnableOutbox();
});
}

public class MyMessageHandler : IHandleMessages<MyMessage>
{
public Context Context { get; set; }

public IBus Bus { get; set; }

public void Handle(MyMessage message)
{
Context.WasCalled = true;
}
}

public class StartMessageHandler : IHandleMessages<StartMessage>
{
public Context Context { get; set; }

public IBus Bus { get; set; }

public void Handle(StartMessage message)
{
Bus.SendLocal(new MyMessage())
;
}
}

public class DispatcherInterceptor : Feature
{
public DispatcherInterceptor()
{
EnableByDefault();
DependsOn<MsmqTransportConfigurator>();
}

protected override void Setup(FeatureConfigurationContext context)
{
var originalDispatcher = EndpointConfiguration.builder.Build<ISendMessages>();
var ctx = EndpointConfiguration.builder.Build<Context>();
context.Container.ConfigureComponent(() => new SenderWrapper(originalDispatcher, ctx), DependencyLifecycle.SingleInstance);
}
}

public class EndpointConfiguration : IWantToRunBeforeConfigurationIsFinalized
{
public static IBuilder builder;

public void Run(Configure config)
{
builder = config.Builder;
}
}


class SenderWrapper : ISendMessages
{
public SenderWrapper(ISendMessages wrappedSender, Context context)
{
this.wrappedSender = wrappedSender;
this.context = context;
}

public void Send(TransportMessage message, SendOptions sendOptions)
{
if (message.Headers[Headers.EnclosedMessageTypes].Contains("MyMessage"))
{
context.TTBRUsed = message.TimeToBeReceived;
}

wrappedSender.Send(message, sendOptions);
}

Context context;
ISendMessages wrappedSender;
}
}

[TimeToBeReceived("00:00:10")]
public class MyMessage : IMessage
{
}

public class StartMessage : IMessage
{
}
}
}
15 changes: 11 additions & 4 deletions src/NServiceBus.Core/Outbox/OutboxDeduplicationBehavior.cs
Expand Up @@ -34,12 +34,12 @@ public void Invoke(IncomingContext context, Action next)
context.Set(outboxMessage);

//we use this scope to make sure that we escalate to DTC if the user is talking to another resource by misstake
using (var checkForEscalationScope = new TransactionScope(TransactionScopeOption.RequiresNew,new TransactionOptions{IsolationLevel = TransactionSettings.IsolationLevel,Timeout = TransactionSettings.TransactionTimeout}))
using (var checkForEscalationScope = new TransactionScope(TransactionScopeOption.RequiresNew, new TransactionOptions { IsolationLevel = TransactionSettings.IsolationLevel, Timeout = TransactionSettings.TransactionTimeout }))
{
next();
checkForEscalationScope.Complete();
}


if (context.handleCurrentMessageLaterWasCalled)
{
Expand All @@ -65,17 +65,24 @@ void DispatchOperationToTransport(IEnumerable<TransportOperation> operations)
Body = transportOperation.Body
};

string ttbr;

if (transportOperation.Options.TryGetValue("TimeToBeReceived", out ttbr))
{
message.TimeToBeReceived = TimeSpan.Parse(ttbr);
}

//dispatch to transport

if (transportOperation.Options["Operation"] != "Audit")
{
DispatchMessageToTransportBehavior.InvokeNative(deliveryOptions, message);
DispatchMessageToTransportBehavior.InvokeNative(deliveryOptions, message);
}
else
{
DefaultMessageAuditer.Audit(deliveryOptions as SendOptions, message);
}

}
}

Expand Down
10 changes: 9 additions & 1 deletion src/NServiceBus.Core/Outbox/OutboxSendBehavior.cs
Expand Up @@ -15,7 +15,15 @@ public void Invoke(OutgoingContext context, Action next)

if (context.TryGet(out currentOutboxMessage))
{
currentOutboxMessage.TransportOperations.Add( new TransportOperation(context.OutgoingMessage.Id, context.DeliveryOptions.ToTransportOperationOptions(), context.OutgoingMessage.Body, context.OutgoingMessage.Headers));
var options = context.DeliveryOptions.ToTransportOperationOptions();
var transportOperation = new TransportOperation(context.OutgoingMessage.Id, options, context.OutgoingMessage.Body, context.OutgoingMessage.Headers);

if (context.OutgoingMessage.TimeToBeReceived != TimeSpan.MaxValue)
{
options["TimeToBeReceived"] = context.OutgoingMessage.TimeToBeReceived.ToString();
}

currentOutboxMessage.TransportOperations.Add(transportOperation);
}
else
{
Expand Down

0 comments on commit 43a114a

Please sign in to comment.