From c7810dba45986bcd3021a3406ad62400fa95463b Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Mon, 29 Jun 2020 23:25:30 -0600 Subject: [PATCH 01/15] Add support for SendsAtomicWithReceive transport transaction mode --- .../FunctionEndpoint.cs | 114 ++++++++++++++---- .../MessageReceiverExtensions.cs | 28 +++++ 2 files changed, 116 insertions(+), 26 deletions(-) create mode 100644 src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs index e4c9c38f..3afbb5cf 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs @@ -3,10 +3,12 @@ using System; using System.Threading; using System.Threading.Tasks; + using System.Transactions; using AzureFunctions; using AzureFunctions.ServiceBus; using Extensibility; using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.ServiceBus.Core; using Microsoft.Extensions.Logging; using Transport; using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext; @@ -27,49 +29,109 @@ public FunctionEndpoint(Func /// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. /// - public async Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null) + public async Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null, MessageReceiver messageReceiver = null) { FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); - var messageContext = CreateMessageContext(message); + // TODO: get the transaction mode the endpoint is configured with + var transportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive; + var useTransaction = messageReceiver != null && transportTransactionMode == TransportTransactionMode.SendsAtomicWithReceive; + var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger); try { - await Process(messageContext, functionExecutionContext).ConfigureAwait(false); + var transportTransaction = CreateTransportTransaction(useTransaction, messageReceiver, message.PartitionKey); + var messageContext = CreateMessageContext(message, transportTransaction); + + using (var scope = useTransaction ? new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled) : null) + { + await Process(messageContext, functionExecutionContext).ConfigureAwait(false); + + // Azure Functions auto-completion would be disabled if we try to run in SendsAtomicWithReceive, need to complete message manually + if (useTransaction) + { + await messageReceiver.CompleteAsync(message.SystemProperties.LockToken).ConfigureAwait(false); + } + + scope?.Complete(); + } } catch (Exception exception) { - var errorContext = new ErrorContext( - exception, - message.GetHeaders(), - messageContext.MessageId, - messageContext.Body, - new TransportTransaction(), - message.SystemProperties.DeliveryCount); - - var errorHandleResult = await ProcessFailedMessage(errorContext, functionExecutionContext) - .ConfigureAwait(false); - - if (errorHandleResult == ErrorHandleResult.Handled) + try + { + ErrorHandleResult result; + + using (var scope = useTransaction ? new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled) : null) + { + var transportTransaction = CreateTransportTransaction(useTransaction, messageReceiver, message.PartitionKey); + var errorContext = CreateErrorContext(exception, message, transportTransaction, message.SystemProperties.DeliveryCount); + + result = await ProcessFailedMessage(errorContext, functionExecutionContext).ConfigureAwait(false); + + if (result == ErrorHandleResult.Handled) + { + await messageReceiver.SafeCompleteAsync(transportTransactionMode, message.SystemProperties.LockToken).ConfigureAwait(false); + } + + scope?.Complete(); + } + + if (result == ErrorHandleResult.RetryRequired) + { + await messageReceiver.SafeAbandonAsync(transportTransactionMode, message.SystemProperties.LockToken).ConfigureAwait(false); + } + } + catch (Exception onErrorException) when (onErrorException is MessageLockLostException || onErrorException is ServiceBusTimeoutException) { - // return to signal to the Functions host it can complete the incoming message - return; + functionExecutionContext.Logger.LogDebug("Failed to execute recoverability.", onErrorException); } + catch (Exception onErrorException) + { + functionExecutionContext.Logger.LogCritical($"Failed to execute recoverability policy for message with native ID: `{message.MessageId}`", onErrorException); - throw; + await messageReceiver.SafeAbandonAsync(transportTransactionMode, message.SystemProperties.LockToken).ConfigureAwait(false); + } } + } + + static MessageContext CreateMessageContext(Message originalMessage, TransportTransaction transportTransaction) + { + var contextBag = new ContextBag(); + contextBag.Set(originalMessage); + + return new MessageContext( + originalMessage.GetMessageId(), + originalMessage.GetHeaders(), + originalMessage.Body, + transportTransaction, + new CancellationTokenSource(), + contextBag); + } - MessageContext CreateMessageContext(Message originalMessage) + static ErrorContext CreateErrorContext(Exception exception, Message originalMessage, TransportTransaction transportTransaction, int immediateProcessingFailures) + { + return new ErrorContext( + exception, + originalMessage.GetHeaders(), + originalMessage.GetMessageId(), + originalMessage.Body, + transportTransaction, + immediateProcessingFailures); + } + + static TransportTransaction CreateTransportTransaction(bool useTransaction, MessageReceiver messageReceiver, string incomingQueuePartitionKey) + { + var transportTransaction = new TransportTransaction(); + + if (useTransaction) { - return new MessageContext( - originalMessage.GetMessageId(), - originalMessage.GetHeaders(), - originalMessage.Body, - new TransportTransaction(), - new CancellationTokenSource(), - new ContextBag()); + transportTransaction.Set((messageReceiver.ServiceBusConnection, messageReceiver.Path)); + transportTransaction.Set("IncomingQueue.PartitionKey", incomingQueuePartitionKey); } + + return transportTransaction; } } } diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs b/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs new file mode 100644 index 00000000..711bf5ca --- /dev/null +++ b/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs @@ -0,0 +1,28 @@ +namespace NServiceBus.AzureFunctions.ServiceBus +{ + using System.Threading.Tasks; + using Microsoft.Azure.ServiceBus.Core; + + internal static class MessageReceiverExtensions + { + public static Task SafeCompleteAsync(this MessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken) + { + if (transportTransactionMode != TransportTransactionMode.None) + { + return messageReceiver.CompleteAsync(lockToken); + } + + return Task.CompletedTask; + } + + public static Task SafeAbandonAsync(this MessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken) + { + if (transportTransactionMode != TransportTransactionMode.None) + { + return messageReceiver.AbandonAsync(lockToken); + } + + return Task.CompletedTask; + } + } +} \ No newline at end of file From a21edbcdfde482b30cd907ce2a23d43a6736510e Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Mon, 29 Jun 2020 23:36:52 -0600 Subject: [PATCH 02/15] Read the body correctly to support messages sent by the legacy ASB transport --- .../MessageExtensions.cs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/MessageExtensions.cs b/src/NServiceBus.AzureFunctions.ServiceBus/MessageExtensions.cs index 20409307..22c2c78f 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/MessageExtensions.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/MessageExtensions.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.ServiceBus.InteropExtensions; static class MessageExtensions { @@ -40,5 +41,15 @@ public static string GetMessageId(this Message message) return message.MessageId; } + + public static byte[] GetBody(this Message message) + { + if (message.UserProperties.TryGetValue("NServiceBus.Transport.Encoding", out var value) && value.Equals("wcf/byte-array")) + { + return message.GetBody() ?? Array.Empty(); + } + + return message.Body ?? Array.Empty(); + } } } \ No newline at end of file From fdfc78576eccd17bc21ae14436fbb0e0c9594765 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Mon, 29 Jun 2020 23:37:50 -0600 Subject: [PATCH 03/15] Dead-letter poisonous messages --- .../FunctionEndpoint.cs | 53 ++++++++++++++----- .../MessageReceiverExtensions.cs | 10 ++++ 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs index 3afbb5cf..17e09bb9 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs @@ -1,6 +1,7 @@ namespace NServiceBus { using System; + using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using System.Transactions; @@ -39,10 +40,35 @@ public async Task Process(Message message, ExecutionContext executionContext, IL var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger); + var lockToken = message.SystemProperties.LockToken; + string messageId; + Dictionary headers; + byte[] body; + + try + { + messageId = message.GetMessageId(); + headers = message.GetHeaders(); + body = message.GetBody(); + } + catch (Exception exception) + { + try + { + await messageReceiver.SafeDeadLetterAsync(transportTransactionMode, lockToken, deadLetterReason: "Poisoned message", deadLetterErrorDescription: exception.Message).ConfigureAwait(false); + } + catch (Exception) + { + // nothing we can do about it, message will be retried + } + + return; + } + try { var transportTransaction = CreateTransportTransaction(useTransaction, messageReceiver, message.PartitionKey); - var messageContext = CreateMessageContext(message, transportTransaction); + var messageContext = CreateMessageContext(message, messageId, headers, body, transportTransaction); using (var scope = useTransaction ? new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled) : null) { @@ -66,13 +92,14 @@ public async Task Process(Message message, ExecutionContext executionContext, IL using (var scope = useTransaction ? new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled) : null) { var transportTransaction = CreateTransportTransaction(useTransaction, messageReceiver, message.PartitionKey); - var errorContext = CreateErrorContext(exception, message, transportTransaction, message.SystemProperties.DeliveryCount); + // provide an unmodified copy of the headers + var errorContext = CreateErrorContext(exception, messageId, message.GetHeaders(), body, transportTransaction, message.SystemProperties.DeliveryCount); result = await ProcessFailedMessage(errorContext, functionExecutionContext).ConfigureAwait(false); if (result == ErrorHandleResult.Handled) { - await messageReceiver.SafeCompleteAsync(transportTransactionMode, message.SystemProperties.LockToken).ConfigureAwait(false); + await messageReceiver.SafeCompleteAsync(transportTransactionMode, lockToken).ConfigureAwait(false); } scope?.Complete(); @@ -80,7 +107,7 @@ public async Task Process(Message message, ExecutionContext executionContext, IL if (result == ErrorHandleResult.RetryRequired) { - await messageReceiver.SafeAbandonAsync(transportTransactionMode, message.SystemProperties.LockToken).ConfigureAwait(false); + await messageReceiver.SafeAbandonAsync(transportTransactionMode, lockToken).ConfigureAwait(false); } } catch (Exception onErrorException) when (onErrorException is MessageLockLostException || onErrorException is ServiceBusTimeoutException) @@ -91,32 +118,32 @@ public async Task Process(Message message, ExecutionContext executionContext, IL { functionExecutionContext.Logger.LogCritical($"Failed to execute recoverability policy for message with native ID: `{message.MessageId}`", onErrorException); - await messageReceiver.SafeAbandonAsync(transportTransactionMode, message.SystemProperties.LockToken).ConfigureAwait(false); + await messageReceiver.SafeAbandonAsync(transportTransactionMode, lockToken).ConfigureAwait(false); } } } - static MessageContext CreateMessageContext(Message originalMessage, TransportTransaction transportTransaction) + static MessageContext CreateMessageContext(Message originalMessage, string messageId, Dictionary headers, byte[] body, TransportTransaction transportTransaction) { var contextBag = new ContextBag(); contextBag.Set(originalMessage); return new MessageContext( - originalMessage.GetMessageId(), - originalMessage.GetHeaders(), - originalMessage.Body, + messageId, + headers, + body, transportTransaction, new CancellationTokenSource(), contextBag); } - static ErrorContext CreateErrorContext(Exception exception, Message originalMessage, TransportTransaction transportTransaction, int immediateProcessingFailures) + static ErrorContext CreateErrorContext(Exception exception, string messageId, Dictionary headers, byte[] body, TransportTransaction transportTransaction, int immediateProcessingFailures) { return new ErrorContext( exception, - originalMessage.GetHeaders(), - originalMessage.GetMessageId(), - originalMessage.Body, + headers, + messageId, + body, transportTransaction, immediateProcessingFailures); } diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs b/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs index 711bf5ca..af4040ac 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs @@ -24,5 +24,15 @@ public static Task SafeAbandonAsync(this MessageReceiver messageReceiver, Transp return Task.CompletedTask; } + + public static Task SafeDeadLetterAsync(this MessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken, string deadLetterReason, string deadLetterErrorDescription) + { + if (transportTransactionMode != TransportTransactionMode.None) + { + return messageReceiver.DeadLetterAsync(lockToken, deadLetterReason, deadLetterErrorDescription); + } + + return Task.CompletedTask; + } } } \ No newline at end of file From dc99f2830ae1941a3604ff6786cf673748374d84 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Mon, 29 Jun 2020 23:46:26 -0600 Subject: [PATCH 04/15] Approve API to change FunctionEndpoint public signature to allow message receiver optional parameter --- .../ApprovalFiles/APIApprovals.Approve.approved.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index 465777e8..b54ec758 100644 --- a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -26,7 +26,7 @@ namespace NServiceBus public class FunctionEndpoint : NServiceBus.AzureFunctions.ServerlessEndpoint { public FunctionEndpoint(System.Func configurationFactory) { } - public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null, Microsoft.Azure.ServiceBus.Core.MessageReceiver messageReceiver = null) { } } public class FunctionExecutionContext { From 812c9e05a77642c9fc1d1ebefc6494f25e9f4c04 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 30 Jun 2020 00:31:11 -0600 Subject: [PATCH 05/15] Switch to an interface to allow faking message receiver --- .../FunctionEndpoint.cs | 4 +- .../MessageReceiverExtensions.cs | 8 +- .../APIApprovals.Approve.approved.txt | 2 +- .../FunctionEndpointComponent.cs | 95 ++++++++++++++++++- 4 files changed, 99 insertions(+), 10 deletions(-) diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs index 17e09bb9..b2c5bd45 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs @@ -30,7 +30,7 @@ public FunctionEndpoint(Func /// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. /// - public async Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null, MessageReceiver messageReceiver = null) + public async Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null, IMessageReceiver messageReceiver = null) { FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); @@ -148,7 +148,7 @@ static ErrorContext CreateErrorContext(Exception exception, string messageId, Di immediateProcessingFailures); } - static TransportTransaction CreateTransportTransaction(bool useTransaction, MessageReceiver messageReceiver, string incomingQueuePartitionKey) + static TransportTransaction CreateTransportTransaction(bool useTransaction, IClientEntity messageReceiver, string incomingQueuePartitionKey) { var transportTransaction = new TransportTransaction(); diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs b/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs index af4040ac..6e1d56fd 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs @@ -3,9 +3,9 @@ using System.Threading.Tasks; using Microsoft.Azure.ServiceBus.Core; - internal static class MessageReceiverExtensions + static class MessageReceiverExtensions { - public static Task SafeCompleteAsync(this MessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken) + public static Task SafeCompleteAsync(this IMessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken) { if (transportTransactionMode != TransportTransactionMode.None) { @@ -15,7 +15,7 @@ public static Task SafeCompleteAsync(this MessageReceiver messageReceiver, Trans return Task.CompletedTask; } - public static Task SafeAbandonAsync(this MessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken) + public static Task SafeAbandonAsync(this IMessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken) { if (transportTransactionMode != TransportTransactionMode.None) { @@ -25,7 +25,7 @@ public static Task SafeAbandonAsync(this MessageReceiver messageReceiver, Transp return Task.CompletedTask; } - public static Task SafeDeadLetterAsync(this MessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken, string deadLetterReason, string deadLetterErrorDescription) + public static Task SafeDeadLetterAsync(this IMessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken, string deadLetterReason, string deadLetterErrorDescription) { if (transportTransactionMode != TransportTransactionMode.None) { diff --git a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index b54ec758..cd78a7a3 100644 --- a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -26,7 +26,7 @@ namespace NServiceBus public class FunctionEndpoint : NServiceBus.AzureFunctions.ServerlessEndpoint { public FunctionEndpoint(System.Func configurationFactory) { } - public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null, Microsoft.Azure.ServiceBus.Core.MessageReceiver messageReceiver = null) { } + public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null, Microsoft.Azure.ServiceBus.Core.IMessageReceiver messageReceiver = null) { } } public class FunctionExecutionContext { diff --git a/src/ServiceBus.Tests/FunctionEndpointComponent.cs b/src/ServiceBus.Tests/FunctionEndpointComponent.cs index b319e93e..b1c49f3d 100644 --- a/src/ServiceBus.Tests/FunctionEndpointComponent.cs +++ b/src/ServiceBus.Tests/FunctionEndpointComponent.cs @@ -8,6 +8,8 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.ServiceBus.Core; + using Microsoft.Extensions.Logging; using NServiceBus; using NServiceBus.AcceptanceTesting; using NServiceBus.AcceptanceTesting.Customization; @@ -32,8 +34,8 @@ public Task CreateRunner(RunDescriptor runDescriptor) { return Task.FromResult( new FunctionRunner( - Messages, - CustomizeConfiguration, + Messages, + CustomizeConfiguration, runDescriptor.ScenarioContext, GetType())); } @@ -107,7 +109,7 @@ public override async Task ComponentsStarted(CancellationToken token) { var transportMessage = GenerateMessage(message); var context = new ExecutionContext(); - await endpoint.Process(transportMessage, context); + await endpoint.Process(transportMessage, context, new FakeLogger(), new FakeMessageReceiver()); } } @@ -153,5 +155,92 @@ Message GenerateMessage(object message) FunctionEndpoint endpoint; IMessageSerializer messageSerializer; } + + class FakeLogger : ILogger + { + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) + { + + } + + public bool IsEnabled(LogLevel logLevel) + { + return false; + } + + public IDisposable BeginScope(TState state) + { + return null; + } + } + + class FakeMessageReceiver : IReceiverClient, IMessageReceiver + { + public Task CloseAsync() => Task.CompletedTask; + + public void RegisterPlugin(ServiceBusPlugin serviceBusPlugin) + { + } + + public void UnregisterPlugin(string serviceBusPluginName) + { + } + + public string ClientId { get; } + public bool IsClosedOrClosing { get; } + public string Path { get; } + public TimeSpan OperationTimeout { get; set; } + public ServiceBusConnection ServiceBusConnection { get; } + public bool OwnsConnection { get; } + public IList RegisteredPlugins { get; } + + public void RegisterMessageHandler(Func handler, Func exceptionReceivedHandler) + { + } + + public void RegisterMessageHandler(Func handler, MessageHandlerOptions messageHandlerOptions) + { + } + + public Task CompleteAsync(string lockToken) => Task.CompletedTask; + + public Task AbandonAsync(string lockToken, IDictionary propertiesToModify = null) => Task.CompletedTask; + + public Task DeadLetterAsync(string lockToken, IDictionary propertiesToModify = null) => Task.CompletedTask; + + public Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) => Task.CompletedTask; + + public int PrefetchCount { get; set; } + public ReceiveMode ReceiveMode { get; } + public Task ReceiveAsync() => throw new NotImplementedException(); + + public Task ReceiveAsync(TimeSpan operationTimeout) => throw new NotImplementedException(); + + public Task> ReceiveAsync(int maxMessageCount) => throw new NotImplementedException(); + + public Task> ReceiveAsync(int maxMessageCount, TimeSpan operationTimeout) => throw new NotImplementedException(); + + public Task ReceiveDeferredMessageAsync(long sequenceNumber) => throw new NotImplementedException(); + + public Task> ReceiveDeferredMessageAsync(IEnumerable sequenceNumbers) => throw new NotImplementedException(); + + public Task CompleteAsync(IEnumerable lockTokens) => throw new NotImplementedException(); + + public Task DeferAsync(string lockToken, IDictionary propertiesToModify = null) => throw new NotImplementedException(); + + public Task RenewLockAsync(Message message) => throw new NotImplementedException(); + + public Task RenewLockAsync(string lockToken) => throw new NotImplementedException(); + + public Task PeekAsync() => throw new NotImplementedException(); + + public Task> PeekAsync(int maxMessageCount) => throw new NotImplementedException(); + + public Task PeekBySequenceNumberAsync(long fromSequenceNumber) => throw new NotImplementedException(); + + public Task> PeekBySequenceNumberAsync(long fromSequenceNumber, int messageCount) => throw new NotImplementedException(); + + public long LastPeekedSequenceNumber { get; } + } } } \ No newline at end of file From 922ade2c0c50867a0c954bc7ec6f95d9fd118f1b Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 30 Jun 2020 00:33:02 -0600 Subject: [PATCH 06/15] Fix failing tests --- .../FunctionEndpoint.cs | 7 +++++++ .../When_shipping_handlers_in_dedicated_assembly.cs | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs index b2c5bd45..31c19f70 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs @@ -108,17 +108,24 @@ public async Task Process(Message message, ExecutionContext executionContext, IL if (result == ErrorHandleResult.RetryRequired) { await messageReceiver.SafeAbandonAsync(transportTransactionMode, lockToken).ConfigureAwait(false); + + // Indicate to the Functions runtime not to complete the incoming message + throw; } } catch (Exception onErrorException) when (onErrorException is MessageLockLostException || onErrorException is ServiceBusTimeoutException) { functionExecutionContext.Logger.LogDebug("Failed to execute recoverability.", onErrorException); + + throw; } catch (Exception onErrorException) { functionExecutionContext.Logger.LogCritical($"Failed to execute recoverability policy for message with native ID: `{message.MessageId}`", onErrorException); await messageReceiver.SafeAbandonAsync(transportTransactionMode, lockToken).ConfigureAwait(false); + + throw; } } } diff --git a/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs b/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs index e221ce56..1ff58dce 100644 --- a/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs +++ b/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs @@ -62,6 +62,17 @@ Message GenerateMessage() var message = new Message(bytes); message.UserProperties["NServiceBus.EnclosedMessageTypes"] = "Testing.Handlers.DummyMessage"; + var systemProperties = new Message.SystemPropertiesCollection(); + // sequence number is required to prevent SystemPropertiesCollection from throwing on the getters + var fieldInfo = typeof(Message.SystemPropertiesCollection).GetField("sequenceNumber", BindingFlags.NonPublic | BindingFlags.Instance); + fieldInfo.SetValue(systemProperties, 123); + // set delivery count to 1 + var deliveryCountProperty = typeof(Message.SystemPropertiesCollection).GetProperty("DeliveryCount"); + deliveryCountProperty.SetValue(systemProperties, 1); + // assign test message mocked system properties + var property = typeof(Message).GetProperty("SystemProperties"); + property.SetValue(message, systemProperties); + return message; } } From 0e32881012cfff079adea86b6e3cab8c6ac4103c Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 30 Jun 2020 00:38:35 -0600 Subject: [PATCH 07/15] Fix inspection error --- src/ServiceBus.Tests/FunctionEndpointComponent.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceBus.Tests/FunctionEndpointComponent.cs b/src/ServiceBus.Tests/FunctionEndpointComponent.cs index b1c49f3d..a8ab738f 100644 --- a/src/ServiceBus.Tests/FunctionEndpointComponent.cs +++ b/src/ServiceBus.Tests/FunctionEndpointComponent.cs @@ -174,7 +174,7 @@ public IDisposable BeginScope(TState state) } } - class FakeMessageReceiver : IReceiverClient, IMessageReceiver + class FakeMessageReceiver : IMessageReceiver { public Task CloseAsync() => Task.CompletedTask; From dedc7ae7e6a8155f2b85bf18205d305d3c469887 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 30 Jun 2020 14:16:12 -0600 Subject: [PATCH 08/15] Fix namespace --- .../TransportWrapper/ServerlessTransportInfrastructure.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs index 5e7ac656..b331d2f2 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Collections.Generic; From 194ad8eb73ba99a09f3a35fa343a777051f78873 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 30 Jun 2020 14:23:17 -0600 Subject: [PATCH 09/15] Suppress ASB transaction to avoid promotions from unit of work (handlers) --- .../TransportWrapper/ServerlessTransport.cs | 2 ++ .../ServerlessTransportInfrastructure.cs | 13 +++---- .../TransactionScopeSuppressBehavior.cs | 35 +++++++++++++++++++ .../TransactionScopeSuppressFeature.cs | 12 +++++++ 4 files changed, 56 insertions(+), 6 deletions(-) create mode 100644 src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressBehavior.cs create mode 100644 src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressFeature.cs diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs index 8dae6ef2..bac31618 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs @@ -1,5 +1,6 @@ namespace NServiceBus.AzureFunctions { + using ServiceBus; using Settings; using Transport; @@ -18,6 +19,7 @@ public ServerlessTransport() public override TransportInfrastructure Initialize(SettingsHolder settings, string connectionString) { var baseTransportInfrastructure = baseTransport.Initialize(settings, connectionString); + return new ServerlessTransportInfrastructure(baseTransportInfrastructure, settings); } diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs index b331d2f2..30a26547 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs @@ -2,31 +2,32 @@ { using System; using System.Collections.Generic; + using Features; using Routing; using Settings; using Transport; class ServerlessTransportInfrastructure : TransportInfrastructure { - public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportInfrastructure, - SettingsHolder settings) + public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportInfrastructure, SettingsHolder settings) { this.baseTransportInfrastructure = baseTransportInfrastructure; this.settings = settings; + + settings.EnableFeatureByDefault(); } - public override IEnumerable DeliveryConstraints => - baseTransportInfrastructure.DeliveryConstraints; + public override IEnumerable DeliveryConstraints => baseTransportInfrastructure.DeliveryConstraints; //support ReceiveOnly so that we can use immediate retries public override TransportTransactionMode TransactionMode { get; } = TransportTransactionMode.ReceiveOnly; - public override OutboundRoutingPolicy OutboundRoutingPolicy => - baseTransportInfrastructure.OutboundRoutingPolicy; + public override OutboundRoutingPolicy OutboundRoutingPolicy => baseTransportInfrastructure.OutboundRoutingPolicy; public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() { var pipelineInvoker = settings.GetOrCreate(); + return new ManualPipelineInvocationInfrastructure(pipelineInvoker); } diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressBehavior.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressBehavior.cs new file mode 100644 index 00000000..60f2c5f9 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressBehavior.cs @@ -0,0 +1,35 @@ +namespace NServiceBus.AzureFunctions.ServiceBus +{ + using System; + using System.Threading.Tasks; + using System.Transactions; + using Pipeline; + + class TransactionScopeSuppressBehavior : IBehavior + { + public async Task Invoke(IIncomingPhysicalMessageContext context, Func next) + { + if (Transaction.Current != null) + { + using (var tx = new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled)) + { + await next(context).ConfigureAwait(false); + + tx.Complete(); + } + } + else + { + await next(context).ConfigureAwait(false); + } + } + + public class Registration : RegisterStep + { + public Registration() : base("HandlerTransactionScopeSuppressWrapper", typeof(TransactionScopeSuppressBehavior), "Makes sure that the handlers gets wrapped in a suppress transaction scope, preventing the ASB transaction scope from promoting") + { + InsertBefore("ExecuteUnitOfWork"); + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressFeature.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressFeature.cs new file mode 100644 index 00000000..bdbf4b1e --- /dev/null +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressFeature.cs @@ -0,0 +1,12 @@ +namespace NServiceBus.AzureFunctions.ServiceBus +{ + using Features; + + class TransactionScopeSuppressFeature : Feature + { + protected override void Setup(FeatureConfigurationContext context) + { + context.Pipeline.Register(new TransactionScopeSuppressBehavior.Registration()); + } + } +} \ No newline at end of file From 820463078a5cb6b9f97794d77602742fbf8ba2e9 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 30 Jun 2020 14:27:32 -0600 Subject: [PATCH 10/15] Up the transport transaction mode to indicate support for SendsAtomicWithReceive --- .../TransportWrapper/ServerlessTransportInfrastructure.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs index 30a26547..0ff7972e 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs @@ -19,8 +19,7 @@ public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportIn public override IEnumerable DeliveryConstraints => baseTransportInfrastructure.DeliveryConstraints; - //support ReceiveOnly so that we can use immediate retries - public override TransportTransactionMode TransactionMode { get; } = TransportTransactionMode.ReceiveOnly; + public override TransportTransactionMode TransactionMode { get; } = TransportTransactionMode.SendsAtomicWithReceive; public override OutboundRoutingPolicy OutboundRoutingPolicy => baseTransportInfrastructure.OutboundRoutingPolicy; From 723a65d63d729d43bb75642f50c2311e3b83250e Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 30 Jun 2020 14:33:10 -0600 Subject: [PATCH 11/15] Remove unnecessary generic --- .../Serverless/TransportWrapper/ServerlessTransport.cs | 2 +- .../TransportWrapper/ServerlessTransportInfrastructure.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs index bac31618..96a60a6e 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs @@ -20,7 +20,7 @@ public override TransportInfrastructure Initialize(SettingsHolder settings, stri { var baseTransportInfrastructure = baseTransport.Initialize(settings, connectionString); - return new ServerlessTransportInfrastructure(baseTransportInfrastructure, settings); + return new ServerlessTransportInfrastructure(baseTransportInfrastructure, settings); } readonly TBaseTransport baseTransport; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs index 0ff7972e..a5eba8df 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs @@ -7,7 +7,7 @@ using Settings; using Transport; - class ServerlessTransportInfrastructure : TransportInfrastructure + class ServerlessTransportInfrastructure : TransportInfrastructure { public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportInfrastructure, SettingsHolder settings) { From 69d3c0eeb73d14db6d2f1749217141af0efb7394 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 30 Jun 2020 14:36:43 -0600 Subject: [PATCH 12/15] Remove changes to suppress ASB transaction (already set in place by the ASB transport) --- .../ServerlessTransportInfrastructure.cs | 3 -- .../TransactionScopeSuppressBehavior.cs | 35 ------------------- .../TransactionScopeSuppressFeature.cs | 12 ------- 3 files changed, 50 deletions(-) delete mode 100644 src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressBehavior.cs delete mode 100644 src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressFeature.cs diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs index a5eba8df..d8a9f612 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs @@ -2,7 +2,6 @@ { using System; using System.Collections.Generic; - using Features; using Routing; using Settings; using Transport; @@ -13,8 +12,6 @@ public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportIn { this.baseTransportInfrastructure = baseTransportInfrastructure; this.settings = settings; - - settings.EnableFeatureByDefault(); } public override IEnumerable DeliveryConstraints => baseTransportInfrastructure.DeliveryConstraints; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressBehavior.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressBehavior.cs deleted file mode 100644 index 60f2c5f9..00000000 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressBehavior.cs +++ /dev/null @@ -1,35 +0,0 @@ -namespace NServiceBus.AzureFunctions.ServiceBus -{ - using System; - using System.Threading.Tasks; - using System.Transactions; - using Pipeline; - - class TransactionScopeSuppressBehavior : IBehavior - { - public async Task Invoke(IIncomingPhysicalMessageContext context, Func next) - { - if (Transaction.Current != null) - { - using (var tx = new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled)) - { - await next(context).ConfigureAwait(false); - - tx.Complete(); - } - } - else - { - await next(context).ConfigureAwait(false); - } - } - - public class Registration : RegisterStep - { - public Registration() : base("HandlerTransactionScopeSuppressWrapper", typeof(TransactionScopeSuppressBehavior), "Makes sure that the handlers gets wrapped in a suppress transaction scope, preventing the ASB transaction scope from promoting") - { - InsertBefore("ExecuteUnitOfWork"); - } - } - } -} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressFeature.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressFeature.cs deleted file mode 100644 index bdbf4b1e..00000000 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/TransactionScopeSuppressFeature.cs +++ /dev/null @@ -1,12 +0,0 @@ -namespace NServiceBus.AzureFunctions.ServiceBus -{ - using Features; - - class TransactionScopeSuppressFeature : Feature - { - protected override void Setup(FeatureConfigurationContext context) - { - context.Pipeline.Register(new TransactionScopeSuppressBehavior.Registration()); - } - } -} \ No newline at end of file From 2dda2847809633502aa441fac431ca414d1a463d Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 30 Jun 2020 14:44:20 -0600 Subject: [PATCH 13/15] Fix namespaces. Only 3 types should be at the NServiceBus scope: FunctionEndpoint, FunctionExecutionContext, and ServiceBusTriggeredEndpointConfiguration --- .../AttributeDiscoverer.cs | 2 +- .../FunctionEndpoint.cs | 1 - .../Logging/FunctionsLoggerFactory.cs | 2 +- .../Logging/Logger.cs | 2 +- .../Configuration/ServerlessTransportExtensions.cs | 2 +- .../Recoverability/ServerlessRecoverabilityPolicy.cs | 2 +- .../Serverless/ServerlessEndpoint.cs | 2 +- .../Serverless/ServerlessEndpointConfiguration.cs | 2 +- .../ManualPipelineInvocationInfrastructure.cs | 2 +- .../Serverless/TransportWrapper/NoOpQueueCreator.cs | 2 +- .../Serverless/TransportWrapper/PipelineInvoker.cs | 2 +- .../Serverless/TransportWrapper/ServerlessTransport.cs | 3 +-- .../ServiceBusTriggeredEndpointConfiguration.cs | 2 +- .../ApprovalFiles/APIApprovals.Approve.approved.txt | 8 ++++---- 14 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/AttributeDiscoverer.cs b/src/NServiceBus.AzureFunctions.ServiceBus/AttributeDiscoverer.cs index 48f65b7d..cedda6a7 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/AttributeDiscoverer.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/AttributeDiscoverer.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Diagnostics; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs index 31c19f70..7f9e5840 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs @@ -5,7 +5,6 @@ using System.Threading; using System.Threading.Tasks; using System.Transactions; - using AzureFunctions; using AzureFunctions.ServiceBus; using Extensibility; using Microsoft.Azure.ServiceBus; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Logging/FunctionsLoggerFactory.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Logging/FunctionsLoggerFactory.cs index 4b3b30ff..1b59a06a 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Logging/FunctionsLoggerFactory.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Logging/FunctionsLoggerFactory.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Threading; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Logging/Logger.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Logging/Logger.cs index 59b5290d..6ec720c5 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Logging/Logger.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Logging/Logger.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Threading; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Configuration/ServerlessTransportExtensions.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Configuration/ServerlessTransportExtensions.cs index 0b3a9e29..103aad43 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Configuration/ServerlessTransportExtensions.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Configuration/ServerlessTransportExtensions.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using Configuration.AdvancedExtensibility; using Transport; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Recoverability/ServerlessRecoverabilityPolicy.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Recoverability/ServerlessRecoverabilityPolicy.cs index 72f4c1dd..dd2135e3 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Recoverability/ServerlessRecoverabilityPolicy.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Recoverability/ServerlessRecoverabilityPolicy.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using Transport; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpoint.cs index f0128920..2f207056 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpoint.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.IO; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpointConfiguration.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpointConfiguration.cs index 194ae3e1..e14b2235 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpointConfiguration.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpointConfiguration.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Security.Cryptography; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ManualPipelineInvocationInfrastructure.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ManualPipelineInvocationInfrastructure.cs index 56a826c3..35432bb4 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ManualPipelineInvocationInfrastructure.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ManualPipelineInvocationInfrastructure.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System.Threading.Tasks; using Transport; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/NoOpQueueCreator.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/NoOpQueueCreator.cs index 80b5bd62..5cd6f68b 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/NoOpQueueCreator.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/NoOpQueueCreator.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System.Threading.Tasks; using Transport; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs index 9d37e265..2bb77c99 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Threading.Tasks; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs index 96a60a6e..1b5ea999 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs @@ -1,6 +1,5 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { - using ServiceBus; using Settings; using Transport; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs b/src/NServiceBus.AzureFunctions.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs index 6695f57e..4959f3c2 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs @@ -3,7 +3,7 @@ using Logging; using Microsoft.Azure.WebJobs; using System; - using AzureFunctions; + using AzureFunctions.ServiceBus; /// /// Represents a serverless NServiceBus endpoint running within an AzureServiceBus trigger. diff --git a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index cd78a7a3..47a4f03d 100644 --- a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -1,5 +1,5 @@ [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"ServiceBus.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { public abstract class ServerlessEndpointConfiguration { @@ -12,7 +12,7 @@ namespace NServiceBus.AzureFunctions where TTransport : NServiceBus.Transport.TransportDefinition, new () { } } public abstract class ServerlessEndpoint - where TConfiguration : NServiceBus.AzureFunctions.ServerlessEndpointConfiguration + where TConfiguration : NServiceBus.AzureFunctions.ServiceBus.ServerlessEndpointConfiguration { protected System.Func AssemblyDirectoryResolver; protected ServerlessEndpoint(System.Func configurationFactory) { } @@ -23,7 +23,7 @@ namespace NServiceBus.AzureFunctions } namespace NServiceBus { - public class FunctionEndpoint : NServiceBus.AzureFunctions.ServerlessEndpoint + public class FunctionEndpoint : NServiceBus.AzureFunctions.ServiceBus.ServerlessEndpoint { public FunctionEndpoint(System.Func configurationFactory) { } public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null, Microsoft.Azure.ServiceBus.Core.IMessageReceiver messageReceiver = null) { } @@ -34,7 +34,7 @@ namespace NServiceBus public Microsoft.Azure.WebJobs.ExecutionContext ExecutionContext { get; } public Microsoft.Extensions.Logging.ILogger Logger { get; } } - public class ServiceBusTriggeredEndpointConfiguration : NServiceBus.AzureFunctions.ServerlessEndpointConfiguration + public class ServiceBusTriggeredEndpointConfiguration : NServiceBus.AzureFunctions.ServiceBus.ServerlessEndpointConfiguration { public ServiceBusTriggeredEndpointConfiguration(string endpointName, string connectionStringName = null) { } public NServiceBus.TransportExtensions Transport { get; } From 968a4fbdcfc2d42e19f58cdddbbb9bca8b5a8a08 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 30 Jun 2020 15:18:13 -0600 Subject: [PATCH 14/15] Determine wherever to use a transaction based on the message receiver availability --- .../FunctionEndpoint.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs index 7f9e5840..a0003d09 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs @@ -33,9 +33,9 @@ public async Task Process(Message message, ExecutionContext executionContext, IL { FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); - // TODO: get the transaction mode the endpoint is configured with - var transportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive; - var useTransaction = messageReceiver != null && transportTransactionMode == TransportTransactionMode.SendsAtomicWithReceive; + // When the message receiver is provided, we assume TransportTransactionMode.SendsAtomicWithReceive to be used + var useTransaction = messageReceiver != null; + var transportTransactionMode = useTransaction ? TransportTransactionMode.SendsAtomicWithReceive : TransportTransactionMode.ReceiveOnly; var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger); From 4b66477ae1aa4c6896b398a22e1d56b1b9d0b030 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 30 Jun 2020 15:18:35 -0600 Subject: [PATCH 15/15] Log when dead-lettering fails --- src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs index a0003d09..e5aa160d 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs @@ -56,9 +56,10 @@ public async Task Process(Message message, ExecutionContext executionContext, IL { await messageReceiver.SafeDeadLetterAsync(transportTransactionMode, lockToken, deadLetterReason: "Poisoned message", deadLetterErrorDescription: exception.Message).ConfigureAwait(false); } - catch (Exception) + catch (Exception ex) { // nothing we can do about it, message will be retried + functionExecutionContext.Logger.LogDebug($"Failed to move a poisonous message with native ID: `{message.MessageId}` to the dead-letter queue. Message will be retried.", ex); } return;