Skip to content

Commit

Permalink
Throw from NServiceBus.MsmqUtilities.Convert will result in message loss
Browse files Browse the repository at this point in the history
fixes #2583
  • Loading branch information
SimonCropp committed Nov 26, 2014
1 parent 9462101 commit 38b5b3c
Show file tree
Hide file tree
Showing 17 changed files with 385 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public EndpointConfigurationBuilder CustomEndpointName(string customEndpointName
return this;
}

public EndpointConfigurationBuilder AllowExceptions()
{
configuration.AllowExceptions = true;

return this;
}

public EndpointConfigurationBuilder AddMapping<T>(Type endpoint)
{
configuration.EndpointMappings.Add(typeof(T),endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public string EndpointName

public Type AuditEndpoint { get; set; }

public bool AllowExceptions { get; set; }

string endpointName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace NServiceBus.AcceptanceTests.BasicMessaging
{
using System;
using NUnit.Framework;

public class When_starting_a_send_only : NServiceBusAcceptanceTest
{
[Test]
public void Should_not_need_audit_or_fault_forwarding_config_to_start()
{
using ((IDisposable)Configure.With(new Type[]
{
})
.DefaultBuilder()
.UnicastBus()
.SendOnly())
{
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ static void SetupLogging(EndpointConfiguration endpointConfiguration, ScenarioCo

try
{
SetLoggingLibrary.Log4Net(null, new ContextAppender(scenarioContext));
SetLoggingLibrary.Log4Net(null, new ContextAppender(scenarioContext, endpointConfiguration));
}
catch (Exception ex)
{
Expand All @@ -105,23 +105,29 @@ static void SetupLogging(EndpointConfiguration endpointConfiguration, ScenarioCo
}
public class ContextAppender : AppenderSkeleton
{
public ContextAppender(ScenarioContext context)
public ContextAppender(ScenarioContext context, EndpointConfiguration endpointConfiguration)
{
this.context = context;
this.endpointConfiguration = endpointConfiguration;
}

protected override void Append(LoggingEvent loggingEvent)
{
if (loggingEvent.ExceptionObject != null)
if (!endpointConfiguration.AllowExceptions && loggingEvent.ExceptionObject != null)
{
lock (context)
{
context.Exceptions += loggingEvent.ExceptionObject + "/n/r";
}
}
if (loggingEvent.Level >= Level.Warn)
{
context.RecordLog(endpointConfiguration.EndpointName, loggingEvent.Level.ToString(), loggingEvent.RenderedMessage);
}

}

ScenarioContext context;
EndpointConfiguration endpointConfiguration;
}
}
17 changes: 17 additions & 0 deletions src/NServiceBus.AcceptanceTests/Exceptions/SerializerCorrupter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace NServiceBus.AcceptanceTests.Exceptions
{
using System;
using System.Reflection;

static class SerializerCorrupter
{

public static void Corrupt()
{
var msmqUtilitiesType = Type.GetType("NServiceBus.Transports.Msmq.MsmqUtilities, NServiceBus.Core");
var headerSerializerField = msmqUtilitiesType.GetField("headerSerializer", BindingFlags.Static | BindingFlags.NonPublic);
headerSerializerField.SetValue(null, null);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
namespace NServiceBus.AcceptanceTests.Exceptions
{
using System;
using System.Linq;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

public class When_cant_convert_to_TransportMessage : NServiceBusAcceptanceTest
{
[Test]
public void Should_send_message_to_error_queue()
{
Scenario.Define<Context>()
.WithEndpoint<Sender>(b => b.Given(bus => bus.Send(new Message())))
.WithEndpoint<Receiver>()
.Done(c => c.GetAllLogs().Any(l => l.Level == "error"))
.Repeat(r => r.For(ScenarioDescriptors.Transports.Msmq))
.Should(c =>
{
var logs = c.GetAllLogs();
Assert.True(logs.Any(l => l.Message.Contains("is corrupt and will be moved to")));
})
.Run();
}

public class Context : ScenarioContext
{
}

public class Sender : EndpointConfigurationBuilder
{
public Sender()
{
EndpointSetup<DefaultServer>()
.AddMapping<Message>(typeof(Receiver));
}
}

public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
SerializerCorrupter.Corrupt();
EndpointSetup<DefaultServer>()
.AllowExceptions();
}

}

[Serializable]
public class Message : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
namespace NServiceBus.AcceptanceTests.Exceptions
{
using System;
using System.Linq;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;
using IMessage = NServiceBus.IMessage;

public class When_cant_convert_to_TransportMessage_NoTransactions : NServiceBusAcceptanceTest
{
[Test]
public void Should_send_message_to_error_queue()
{
Scenario.Define<Context>()
.WithEndpoint<Sender>(b => b.Given(bus => bus.Send(new Message())))
.WithEndpoint<Receiver>()
.Done(c => c.GetAllLogs().Any(l => l.Level == "error"))
.Repeat(r => r.For(ScenarioDescriptors.Transports.Msmq))
.Should(c =>
{
var logs = c.GetAllLogs();
Assert.True(logs.Any(l => l.Message.Contains("is corrupt and will be moved to")));
})
.Run();
}

public class Context : ScenarioContext
{
}

public class Sender : EndpointConfigurationBuilder
{
public Sender()
{
Configure.Transactions.Disable();
EndpointSetup<DefaultServer>()
.AddMapping<Message>(typeof(Receiver));
}
}

public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
SerializerCorrupter.Corrupt();
Configure.Transactions.Disable();
EndpointSetup<DefaultServer>()
.AllowExceptions();
}
}

[Serializable]
public class Message : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
namespace NServiceBus.AcceptanceTests.Exceptions
{
using System;
using System.Linq;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

public class When_cant_convert_to_TransportMessage_SuppressedDTC : NServiceBusAcceptanceTest
{
[Test]
public void Should_send_message_to_error_queue()
{
Scenario.Define<Context>()
.WithEndpoint<Sender>(b => b.Given(bus => bus.Send(new Message())))
.WithEndpoint<Receiver>()
.Done(c => c.GetAllLogs().Any(l => l.Level == "error"))
.Repeat(r => r.For(ScenarioDescriptors.Transports.Msmq))
.Should(c =>
{
var logs = c.GetAllLogs();
Assert.True(logs.Any(l => l.Message.Contains("is corrupt and will be moved to")));
})
.Run();
}

public class Context : ScenarioContext
{
}

public class Sender : EndpointConfigurationBuilder
{
public Sender()
{
Configure.Transactions.Advanced(settings => settings.DisableDistributedTransactions());
EndpointSetup<DefaultServer>()
.AddMapping<Message>(typeof(Receiver));
}
}

public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
SerializerCorrupter.Corrupt();
Configure.Transactions.Advanced(settings => settings.DisableDistributedTransactions());
EndpointSetup<DefaultServer>()
.AllowExceptions();
}
}

[Serializable]
public class Message : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
<ItemGroup>
<Compile Include="Audit\When_using_auditing_as_a_feature.cs" />
<Compile Include="Audit\When_a_message_is_audited.cs" />
<Compile Include="BasicMessaging\When_starting_a_send_only.cs" />
<Compile Include="BasicMessaging\MessingAround.cs" />
<Compile Include="BasicMessaging\When_aborting_the_behavior_chain.cs" />
<Compile Include="BasicMessaging\When_handling_current_message_later.cs" />
Expand All @@ -121,6 +122,10 @@
<Compile Include="Encryption\When_using_encryption_with_custom_service.cs" />
<Compile Include="Encryption\When_using_encryption_with_multikey.cs" />
<Compile Include="EndpointTemplates\BusExtensions.cs" />
<Compile Include="Exceptions\When_cant_convert_to_TransportMessage_NoTransactions.cs" />
<Compile Include="Exceptions\When_cant_convert_to_TransportMessage_SuppressedDTC.cs" />
<Compile Include="Exceptions\SerializerCorrupter.cs" />
<Compile Include="Exceptions\When_cant_convert_to_TransportMessage.cs" />
<Compile Include="Gateway\When_sending_a_message_via_the_gateway.cs" />
<Compile Include="Gateway\When_doing_request_response_between_sites.cs" />
<Compile Include="Gateway\When_using_a_custom_correlation_id.cs" />
Expand Down
47 changes: 3 additions & 44 deletions src/NServiceBus.Core/ConfigureFaultsForwarder.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
namespace NServiceBus
{
using System.Configuration;
using Config;
using Faults;
using Faults.Forwarder;
using Logging;
using Settings;
using Utils;

/// <summary>
/// Contains extension methods to NServiceBus.Configure
Expand All @@ -27,45 +23,9 @@ public static Configure MessageForwardingInCaseOfFault(this Configure config)
return config;
}

ErrorQueue = Address.Undefined;

var section = Configure.GetConfigSection<MessageForwardingInCaseOfFaultConfig>();
if (section != null)
{
if (string.IsNullOrWhiteSpace(section.ErrorQueue))
{
throw new ConfigurationErrorsException(
"'MessageForwardingInCaseOfFaultConfig' configuration section is found but 'ErrorQueue' value is missing." +
"\n The following is an example for adding such a value to your app config: " +
"\n <MessageForwardingInCaseOfFaultConfig ErrorQueue=\"error\"/> \n");
}

Logger.Debug("Error queue retrieved from <MessageForwardingInCaseOfFaultConfig> element in config file.");

ErrorQueue = Address.Parse(section.ErrorQueue);

config.Configurer.ConfigureComponent<FaultManager>(DependencyLifecycle.InstancePerCall)
.ConfigureProperty(fm => fm.ErrorQueue, ErrorQueue);

return config;
}


var errorQueue = RegistryReader<string>.Read("ErrorQueue");
if (!string.IsNullOrWhiteSpace(errorQueue))
{
Logger.Debug("Error queue retrieved from registry settings.");
ErrorQueue = Address.Parse(errorQueue);

config.Configurer.ConfigureComponent<FaultManager>(DependencyLifecycle.InstancePerCall)
.ConfigureProperty(fm => fm.ErrorQueue, ErrorQueue);
}

if (ErrorQueue == Address.Undefined)
{
throw new ConfigurationErrorsException("Faults forwarding requires an error queue to be specified. Please add a 'MessageForwardingInCaseOfFaultConfig' section to your app.config" +
"\n or configure a global one using the powershell command: Set-NServiceBusLocalMachineSettings -ErrorQueue {address of error queue}");
}
ErrorQueue = config.GetConfiguredErrorQueue();
config.Configurer.ConfigureComponent<FaultManager>(DependencyLifecycle.InstancePerCall)
.ConfigureProperty(fm => fm.ErrorQueue, ErrorQueue);

return config;
}
Expand All @@ -75,7 +35,6 @@ public static Configure MessageForwardingInCaseOfFault(this Configure config)
/// </summary>
public static Address ErrorQueue { get; private set; }

static readonly ILog Logger = LogManager.GetLogger(typeof(ConfigureFaultsForwarder));
}

class Bootstrapper : INeedInitialization
Expand Down

0 comments on commit 38b5b3c

Please sign in to comment.