Skip to content
This repository has been archived by the owner on Nov 23, 2021. It is now read-only.

Commit

Permalink
Raise critical error when onError throws an exception
Browse files Browse the repository at this point in the history
  • Loading branch information
boblangley committed Apr 17, 2019
1 parent a4f4a27 commit 4bb3305
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 14 deletions.
30 changes: 20 additions & 10 deletions src/Transport/Receiving/MessageReceiverNotifier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ public MessageReceiverNotifier(ICreateMessageReceivers receiverCreator, IConvert
}

public bool IsRunning { get; set; }

public int RefCount { get; set; }
public void Initialize(EntityInfo entity, Func<IncomingMessageDetails, ReceiveContext, Task> callback, Func<Exception, Task> errorCallback, Func<ErrorContext, Task<ErrorHandleResult>> processingFailureCallback, int maximumConcurrency)

public void Initialize(EntityInfo entity, Func<IncomingMessageDetails, ReceiveContext, Task> callback, Func<Exception, Task> errorCallback, Func<ErrorContext, Task<ErrorHandleResult>> processingFailureCallback, CriticalError criticalError, int maximumConcurrency)
{
receiveMode = settings.Get<ReceiveMode>(WellKnownConfigurationKeys.Connectivity.MessageReceivers.ReceiveMode);

incomingCallback = callback;
this.errorCallback = errorCallback ?? EmptyErrorCallback;
this.processingFailureCallback = processingFailureCallback;
this.criticalError = criticalError;
this.entity = entity;

fullPath = entity.Path;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void Start()
{
if (Interlocked.Increment(ref refCount) == 1)
{
StartInternal();
StartInternal();
}
}

Expand Down Expand Up @@ -113,7 +114,7 @@ void StartInternal()
} while (attempt < 2);
});
if (exceptions.Count > 0) throw new AggregateException(exceptions);

IsRunning = true;
}

Expand Down Expand Up @@ -277,14 +278,22 @@ async Task ProcessMessage(IMessageReceiver internalReceiver, BrokeredMessage mes
transportTransaction.Set(context);
var errorContext = new ErrorContext(exception, incomingMessage.Headers, incomingMessage.MessageId, incomingMessage.Body, transportTransaction, message.DeliveryCount);

var result = await processingFailureCallback(errorContext).ConfigureAwait(false);
if (result == ErrorHandleResult.RetryRequired)
try
{
await Abandon(message, exception).ConfigureAwait(false);
var result = await processingFailureCallback(errorContext).ConfigureAwait(false);
if (result == ErrorHandleResult.RetryRequired)
{
await Abandon(message, exception).ConfigureAwait(false);
}
else
{
await HandleCompletion(message, context, completionCanBeBatched, slotNumber).ConfigureAwait(false);
}
}
else
catch (Exception ex)
{
await HandleCompletion(message, context, completionCanBeBatched, slotNumber).ConfigureAwait(false);
criticalError.Raise($"Failed to execute recoverability policy for message with native ID: `{message.MessageId}`", ex);
await Abandon(message, exception).ConfigureAwait(false);
}
}
}
Expand Down Expand Up @@ -378,5 +387,6 @@ static Task EmptyErrorCallback(Exception exception)
bool wrapInScope;
bool completionCanBeBatched;
static ILog logger = LogManager.GetLogger<MessageReceiverNotifier>();
CriticalError criticalError;
}
}
1 change: 1 addition & 0 deletions src/Transport/Seam/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public Task Init(Func<MessageContext, Task> pump, Func<ErrorContext, Task<ErrorH

topologyOperator.OnError(exception => circuitBreaker.Failure(exception));
topologyOperator.OnProcessingFailure(onError);
container.Register(criticalError);

return TaskEx.Completed;
}
Expand Down
6 changes: 4 additions & 2 deletions src/Transport/Topology/TopologyOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void Start(TopologySection topologySection, int maximumConcurrency)
topology = topologySection;

StartNotifiersFor(topology.Entities);

running = true;

Action operation;
Expand Down Expand Up @@ -74,6 +74,8 @@ public void OnProcessingFailure(Func<ErrorContext, Task<ErrorHandleResult>> func

void StartNotifiersFor(IEnumerable<EntityInfo> entities)
{
var criticalError = container.Resolve<CriticalError>();

foreach (var entity in entities)
{
if (!entity.ShouldBeListenedTo)
Expand All @@ -82,7 +84,7 @@ void StartNotifiersFor(IEnumerable<EntityInfo> entities)
var notifier = notifiers.GetOrAdd(entity, e =>
{
var n = CreateNotifier(entity.Type);
n.Initialize(e, onMessage, onError, onProcessingFailure, maxConcurrency);
n.Initialize(e, onMessage, onError, onProcessingFailure, criticalError, maxConcurrency);
return n;
});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
namespace NServiceBus.TransportTests
{
using System;
using System.Linq;
using System.Threading.Tasks;
using NServiceBus.Logging;
using NUnit.Framework;
using Transport;

Expand All @@ -13,15 +15,23 @@ public class When_on_error_throws : NServiceBusTransportTest
[TestCase(TransportTransactionMode.TransactionScope)]
public async Task Should_reinvoke_on_error_with_original_exception(TransportTransactionMode transactionMode)
{
var loggerFactory = new TransportTestLoggerFactory();
LogManager.UseFactory(loggerFactory);

var onErrorCalled = new TaskCompletionSource<ErrorContext>();
var criticalErrorCalled = false;
string criticalErrorMessage = null;

OnTestTimeout(() => onErrorCalled.SetCanceled());

var firstInvocation = true;
string nativeMessageId = null;

await StartPump(
context =>
{
nativeMessageId = context.MessageId;
throw new Exception("Simulated exception");
},
context =>
Expand All @@ -37,13 +47,21 @@ await StartPump(
return Task.FromResult(ErrorHandleResult.Handled);
},
transactionMode);
transactionMode,
(message, exception) =>
{
criticalErrorCalled = true;
criticalErrorMessage = message;
});

await SendMessage(InputQueueName);

var errorContext = await onErrorCalled.Task;

Assert.AreEqual("Simulated exception", errorContext.Exception.Message);
Assert.AreEqual("Simulated exception", errorContext.Exception.Message, "Should retry the message");
Assert.True(criticalErrorCalled, "Should invoke critical error");
Assert.AreEqual($"Failed to execute recoverability policy for message with native ID: `{nativeMessageId}`", criticalErrorMessage, "Incorrect critical error message.");
Assert.False(loggerFactory.LogItems.Any(item => item.Level > LogLevel.Info));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
<Compile Include="App_Packages\NSB.TransportTests.6.2.0\When_using_non_durable_delivery.cs" />
<Compile Include="ConfigureAzureServiceBusTransportInfrastructure.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TransportTestLoggerFactory.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
Expand Down
134 changes: 134 additions & 0 deletions src/TransportTests/TransportTestLoggerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
namespace NServiceBus.TransportTests
{
using System;
using System.Collections.Generic;
using Logging;
using NUnit.Framework;

public class TransportTestLoggerFactory : ILoggerFactory
{
public ILog GetLogger(Type type)
{
return GetLogger(type.FullName);
}

public ILog GetLogger(string name)
{
return new TransportTestLogger(name, LogItems);
}

public List<LogItem> LogItems { get; } = new List<LogItem>();

public class LogItem
{
// ReSharper disable NotAccessedField.Global
public LogLevel Level;
public string Message;
// ReSharper restore NotAccessedField.Global
}

class TransportTestLogger : ILog
{
public TransportTestLogger(string name, List<LogItem> logItems)
{
this.name = name;
this.logItems = logItems;
}

public bool IsDebugEnabled { get; } = true;
public bool IsInfoEnabled { get; } = true;
public bool IsWarnEnabled { get; } = true;
public bool IsErrorEnabled { get; } = true;
public bool IsFatalEnabled { get; } = true;

public void Debug(string message)
{
Log(LogLevel.Debug, message);
}

public void Debug(string message, Exception exception)
{
Log(LogLevel.Debug, $"{message} {exception}");
}

public void DebugFormat(string format, params object[] args)
{
Log(LogLevel.Debug, string.Format(format, args));
}

public void Info(string message)
{
Log(LogLevel.Info, message);
}

public void Info(string message, Exception exception)
{
Log(LogLevel.Info, $"{message} {exception}");
}

public void InfoFormat(string format, params object[] args)
{
Log(LogLevel.Info, string.Format(format, args));
}

public void Warn(string message)
{
Log(LogLevel.Warn, message);
}

public void Warn(string message, Exception exception)
{
Log(LogLevel.Warn, $"{message} {exception}");
}

public void WarnFormat(string format, params object[] args)
{
Log(LogLevel.Warn, string.Format(format, args));
}

public void Error(string message)
{
Log(LogLevel.Error, message);
}

public void Error(string message, Exception exception)
{
Log(LogLevel.Error, $"{message} {exception}");
}

public void ErrorFormat(string format, params object[] args)
{
Log(LogLevel.Error, string.Format(format, args));
}

public void Fatal(string message)
{
Log(LogLevel.Fatal, message);
}

public void Fatal(string message, Exception exception)
{
Log(LogLevel.Fatal, $"{message} {exception}");
}

public void FatalFormat(string format, params object[] args)
{
Log(LogLevel.Fatal, string.Format(format, args));
}

void Log(LogLevel level, string message)
{
logItems.Add(new LogItem
{
Level = level,
Message = message
});

TestContext.WriteLine($"{DateTime.Now:T} {level} {name}: {message}");
}

string name;
List<LogItem> logItems;
}
}
}

0 comments on commit 4bb3305

Please sign in to comment.