diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/AttributeDiscoverer.cs b/src/NServiceBus.AzureFunctions.ServiceBus/AttributeDiscoverer.cs index 48f65b7d..cedda6a7 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/AttributeDiscoverer.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/AttributeDiscoverer.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Diagnostics; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs index e4c9c38f..e5aa160d 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs @@ -1,12 +1,14 @@ namespace NServiceBus { using System; + using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; - using AzureFunctions; + using System.Transactions; using AzureFunctions.ServiceBus; using Extensibility; using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.ServiceBus.Core; using Microsoft.Extensions.Logging; using Transport; using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext; @@ -27,49 +29,143 @@ public FunctionEndpoint(Func /// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. /// - public async Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null) + public async Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null, IMessageReceiver messageReceiver = null) { FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); - var messageContext = CreateMessageContext(message); + // When the message receiver is provided, we assume TransportTransactionMode.SendsAtomicWithReceive to be used + var useTransaction = messageReceiver != null; + var transportTransactionMode = useTransaction ? TransportTransactionMode.SendsAtomicWithReceive : TransportTransactionMode.ReceiveOnly; + var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger); + var lockToken = message.SystemProperties.LockToken; + string messageId; + Dictionary headers; + byte[] body; + try { - await Process(messageContext, functionExecutionContext).ConfigureAwait(false); + messageId = message.GetMessageId(); + headers = message.GetHeaders(); + body = message.GetBody(); } catch (Exception exception) { - var errorContext = new ErrorContext( - exception, - message.GetHeaders(), - messageContext.MessageId, - messageContext.Body, - new TransportTransaction(), - message.SystemProperties.DeliveryCount); - - var errorHandleResult = await ProcessFailedMessage(errorContext, functionExecutionContext) - .ConfigureAwait(false); - - if (errorHandleResult == ErrorHandleResult.Handled) + try + { + await messageReceiver.SafeDeadLetterAsync(transportTransactionMode, lockToken, deadLetterReason: "Poisoned message", deadLetterErrorDescription: exception.Message).ConfigureAwait(false); + } + catch (Exception ex) + { + // nothing we can do about it, message will be retried + functionExecutionContext.Logger.LogDebug($"Failed to move a poisonous message with native ID: `{message.MessageId}` to the dead-letter queue. Message will be retried.", ex); + } + + return; + } + + try + { + var transportTransaction = CreateTransportTransaction(useTransaction, messageReceiver, message.PartitionKey); + var messageContext = CreateMessageContext(message, messageId, headers, body, transportTransaction); + + using (var scope = useTransaction ? new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled) : null) + { + await Process(messageContext, functionExecutionContext).ConfigureAwait(false); + + // Azure Functions auto-completion would be disabled if we try to run in SendsAtomicWithReceive, need to complete message manually + if (useTransaction) + { + await messageReceiver.CompleteAsync(message.SystemProperties.LockToken).ConfigureAwait(false); + } + + scope?.Complete(); + } + } + catch (Exception exception) + { + try + { + ErrorHandleResult result; + + using (var scope = useTransaction ? new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled) : null) + { + var transportTransaction = CreateTransportTransaction(useTransaction, messageReceiver, message.PartitionKey); + // provide an unmodified copy of the headers + var errorContext = CreateErrorContext(exception, messageId, message.GetHeaders(), body, transportTransaction, message.SystemProperties.DeliveryCount); + + result = await ProcessFailedMessage(errorContext, functionExecutionContext).ConfigureAwait(false); + + if (result == ErrorHandleResult.Handled) + { + await messageReceiver.SafeCompleteAsync(transportTransactionMode, lockToken).ConfigureAwait(false); + } + + scope?.Complete(); + } + + if (result == ErrorHandleResult.RetryRequired) + { + await messageReceiver.SafeAbandonAsync(transportTransactionMode, lockToken).ConfigureAwait(false); + + // Indicate to the Functions runtime not to complete the incoming message + throw; + } + } + catch (Exception onErrorException) when (onErrorException is MessageLockLostException || onErrorException is ServiceBusTimeoutException) { - // return to signal to the Functions host it can complete the incoming message - return; + functionExecutionContext.Logger.LogDebug("Failed to execute recoverability.", onErrorException); + + throw; } + catch (Exception onErrorException) + { + functionExecutionContext.Logger.LogCritical($"Failed to execute recoverability policy for message with native ID: `{message.MessageId}`", onErrorException); + + await messageReceiver.SafeAbandonAsync(transportTransactionMode, lockToken).ConfigureAwait(false); - throw; + throw; + } } + } - MessageContext CreateMessageContext(Message originalMessage) + static MessageContext CreateMessageContext(Message originalMessage, string messageId, Dictionary headers, byte[] body, TransportTransaction transportTransaction) + { + var contextBag = new ContextBag(); + contextBag.Set(originalMessage); + + return new MessageContext( + messageId, + headers, + body, + transportTransaction, + new CancellationTokenSource(), + contextBag); + } + + static ErrorContext CreateErrorContext(Exception exception, string messageId, Dictionary headers, byte[] body, TransportTransaction transportTransaction, int immediateProcessingFailures) + { + return new ErrorContext( + exception, + headers, + messageId, + body, + transportTransaction, + immediateProcessingFailures); + } + + static TransportTransaction CreateTransportTransaction(bool useTransaction, IClientEntity messageReceiver, string incomingQueuePartitionKey) + { + var transportTransaction = new TransportTransaction(); + + if (useTransaction) { - return new MessageContext( - originalMessage.GetMessageId(), - originalMessage.GetHeaders(), - originalMessage.Body, - new TransportTransaction(), - new CancellationTokenSource(), - new ContextBag()); + transportTransaction.Set((messageReceiver.ServiceBusConnection, messageReceiver.Path)); + transportTransaction.Set("IncomingQueue.PartitionKey", incomingQueuePartitionKey); } + + return transportTransaction; } } } diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Logging/FunctionsLoggerFactory.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Logging/FunctionsLoggerFactory.cs index 4b3b30ff..1b59a06a 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Logging/FunctionsLoggerFactory.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Logging/FunctionsLoggerFactory.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Threading; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Logging/Logger.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Logging/Logger.cs index 59b5290d..6ec720c5 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Logging/Logger.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Logging/Logger.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Threading; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/MessageExtensions.cs b/src/NServiceBus.AzureFunctions.ServiceBus/MessageExtensions.cs index 20409307..22c2c78f 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/MessageExtensions.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/MessageExtensions.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.ServiceBus.InteropExtensions; static class MessageExtensions { @@ -40,5 +41,15 @@ public static string GetMessageId(this Message message) return message.MessageId; } + + public static byte[] GetBody(this Message message) + { + if (message.UserProperties.TryGetValue("NServiceBus.Transport.Encoding", out var value) && value.Equals("wcf/byte-array")) + { + return message.GetBody() ?? Array.Empty(); + } + + return message.Body ?? Array.Empty(); + } } } \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs b/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs new file mode 100644 index 00000000..6e1d56fd --- /dev/null +++ b/src/NServiceBus.AzureFunctions.ServiceBus/MessageReceiverExtensions.cs @@ -0,0 +1,38 @@ +namespace NServiceBus.AzureFunctions.ServiceBus +{ + using System.Threading.Tasks; + using Microsoft.Azure.ServiceBus.Core; + + static class MessageReceiverExtensions + { + public static Task SafeCompleteAsync(this IMessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken) + { + if (transportTransactionMode != TransportTransactionMode.None) + { + return messageReceiver.CompleteAsync(lockToken); + } + + return Task.CompletedTask; + } + + public static Task SafeAbandonAsync(this IMessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken) + { + if (transportTransactionMode != TransportTransactionMode.None) + { + return messageReceiver.AbandonAsync(lockToken); + } + + return Task.CompletedTask; + } + + public static Task SafeDeadLetterAsync(this IMessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken, string deadLetterReason, string deadLetterErrorDescription) + { + if (transportTransactionMode != TransportTransactionMode.None) + { + return messageReceiver.DeadLetterAsync(lockToken, deadLetterReason, deadLetterErrorDescription); + } + + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Configuration/ServerlessTransportExtensions.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Configuration/ServerlessTransportExtensions.cs index 0b3a9e29..103aad43 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Configuration/ServerlessTransportExtensions.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Configuration/ServerlessTransportExtensions.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using Configuration.AdvancedExtensibility; using Transport; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Recoverability/ServerlessRecoverabilityPolicy.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Recoverability/ServerlessRecoverabilityPolicy.cs index 72f4c1dd..dd2135e3 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Recoverability/ServerlessRecoverabilityPolicy.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/Recoverability/ServerlessRecoverabilityPolicy.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using Transport; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpoint.cs index f0128920..2f207056 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpoint.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.IO; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpointConfiguration.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpointConfiguration.cs index 194ae3e1..e14b2235 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpointConfiguration.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/ServerlessEndpointConfiguration.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Security.Cryptography; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ManualPipelineInvocationInfrastructure.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ManualPipelineInvocationInfrastructure.cs index 56a826c3..35432bb4 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ManualPipelineInvocationInfrastructure.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ManualPipelineInvocationInfrastructure.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System.Threading.Tasks; using Transport; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/NoOpQueueCreator.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/NoOpQueueCreator.cs index 80b5bd62..5cd6f68b 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/NoOpQueueCreator.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/NoOpQueueCreator.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System.Threading.Tasks; using Transport; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs index 9d37e265..2bb77c99 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Threading.Tasks; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs index 8dae6ef2..1b5ea999 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using Settings; using Transport; @@ -18,7 +18,8 @@ public ServerlessTransport() public override TransportInfrastructure Initialize(SettingsHolder settings, string connectionString) { var baseTransportInfrastructure = baseTransport.Initialize(settings, connectionString); - return new ServerlessTransportInfrastructure(baseTransportInfrastructure, settings); + + return new ServerlessTransportInfrastructure(baseTransportInfrastructure, settings); } readonly TBaseTransport baseTransport; diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs index 5e7ac656..d8a9f612 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { using System; using System.Collections.Generic; @@ -6,27 +6,24 @@ using Settings; using Transport; - class ServerlessTransportInfrastructure : TransportInfrastructure + class ServerlessTransportInfrastructure : TransportInfrastructure { - public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportInfrastructure, - SettingsHolder settings) + public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportInfrastructure, SettingsHolder settings) { this.baseTransportInfrastructure = baseTransportInfrastructure; this.settings = settings; } - public override IEnumerable DeliveryConstraints => - baseTransportInfrastructure.DeliveryConstraints; + public override IEnumerable DeliveryConstraints => baseTransportInfrastructure.DeliveryConstraints; - //support ReceiveOnly so that we can use immediate retries - public override TransportTransactionMode TransactionMode { get; } = TransportTransactionMode.ReceiveOnly; + public override TransportTransactionMode TransactionMode { get; } = TransportTransactionMode.SendsAtomicWithReceive; - public override OutboundRoutingPolicy OutboundRoutingPolicy => - baseTransportInfrastructure.OutboundRoutingPolicy; + public override OutboundRoutingPolicy OutboundRoutingPolicy => baseTransportInfrastructure.OutboundRoutingPolicy; public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() { var pipelineInvoker = settings.GetOrCreate(); + return new ManualPipelineInvocationInfrastructure(pipelineInvoker); } diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs b/src/NServiceBus.AzureFunctions.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs index 6695f57e..4959f3c2 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs @@ -3,7 +3,7 @@ using Logging; using Microsoft.Azure.WebJobs; using System; - using AzureFunctions; + using AzureFunctions.ServiceBus; /// /// Represents a serverless NServiceBus endpoint running within an AzureServiceBus trigger. diff --git a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index 465777e8..47a4f03d 100644 --- a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -1,5 +1,5 @@ [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"ServiceBus.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] -namespace NServiceBus.AzureFunctions +namespace NServiceBus.AzureFunctions.ServiceBus { public abstract class ServerlessEndpointConfiguration { @@ -12,7 +12,7 @@ namespace NServiceBus.AzureFunctions where TTransport : NServiceBus.Transport.TransportDefinition, new () { } } public abstract class ServerlessEndpoint - where TConfiguration : NServiceBus.AzureFunctions.ServerlessEndpointConfiguration + where TConfiguration : NServiceBus.AzureFunctions.ServiceBus.ServerlessEndpointConfiguration { protected System.Func AssemblyDirectoryResolver; protected ServerlessEndpoint(System.Func configurationFactory) { } @@ -23,10 +23,10 @@ namespace NServiceBus.AzureFunctions } namespace NServiceBus { - public class FunctionEndpoint : NServiceBus.AzureFunctions.ServerlessEndpoint + public class FunctionEndpoint : NServiceBus.AzureFunctions.ServiceBus.ServerlessEndpoint { public FunctionEndpoint(System.Func configurationFactory) { } - public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null, Microsoft.Azure.ServiceBus.Core.IMessageReceiver messageReceiver = null) { } } public class FunctionExecutionContext { @@ -34,7 +34,7 @@ namespace NServiceBus public Microsoft.Azure.WebJobs.ExecutionContext ExecutionContext { get; } public Microsoft.Extensions.Logging.ILogger Logger { get; } } - public class ServiceBusTriggeredEndpointConfiguration : NServiceBus.AzureFunctions.ServerlessEndpointConfiguration + public class ServiceBusTriggeredEndpointConfiguration : NServiceBus.AzureFunctions.ServiceBus.ServerlessEndpointConfiguration { public ServiceBusTriggeredEndpointConfiguration(string endpointName, string connectionStringName = null) { } public NServiceBus.TransportExtensions Transport { get; } diff --git a/src/ServiceBus.Tests/FunctionEndpointComponent.cs b/src/ServiceBus.Tests/FunctionEndpointComponent.cs index b319e93e..a8ab738f 100644 --- a/src/ServiceBus.Tests/FunctionEndpointComponent.cs +++ b/src/ServiceBus.Tests/FunctionEndpointComponent.cs @@ -8,6 +8,8 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.ServiceBus.Core; + using Microsoft.Extensions.Logging; using NServiceBus; using NServiceBus.AcceptanceTesting; using NServiceBus.AcceptanceTesting.Customization; @@ -32,8 +34,8 @@ public Task CreateRunner(RunDescriptor runDescriptor) { return Task.FromResult( new FunctionRunner( - Messages, - CustomizeConfiguration, + Messages, + CustomizeConfiguration, runDescriptor.ScenarioContext, GetType())); } @@ -107,7 +109,7 @@ public override async Task ComponentsStarted(CancellationToken token) { var transportMessage = GenerateMessage(message); var context = new ExecutionContext(); - await endpoint.Process(transportMessage, context); + await endpoint.Process(transportMessage, context, new FakeLogger(), new FakeMessageReceiver()); } } @@ -153,5 +155,92 @@ Message GenerateMessage(object message) FunctionEndpoint endpoint; IMessageSerializer messageSerializer; } + + class FakeLogger : ILogger + { + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) + { + + } + + public bool IsEnabled(LogLevel logLevel) + { + return false; + } + + public IDisposable BeginScope(TState state) + { + return null; + } + } + + class FakeMessageReceiver : IMessageReceiver + { + public Task CloseAsync() => Task.CompletedTask; + + public void RegisterPlugin(ServiceBusPlugin serviceBusPlugin) + { + } + + public void UnregisterPlugin(string serviceBusPluginName) + { + } + + public string ClientId { get; } + public bool IsClosedOrClosing { get; } + public string Path { get; } + public TimeSpan OperationTimeout { get; set; } + public ServiceBusConnection ServiceBusConnection { get; } + public bool OwnsConnection { get; } + public IList RegisteredPlugins { get; } + + public void RegisterMessageHandler(Func handler, Func exceptionReceivedHandler) + { + } + + public void RegisterMessageHandler(Func handler, MessageHandlerOptions messageHandlerOptions) + { + } + + public Task CompleteAsync(string lockToken) => Task.CompletedTask; + + public Task AbandonAsync(string lockToken, IDictionary propertiesToModify = null) => Task.CompletedTask; + + public Task DeadLetterAsync(string lockToken, IDictionary propertiesToModify = null) => Task.CompletedTask; + + public Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) => Task.CompletedTask; + + public int PrefetchCount { get; set; } + public ReceiveMode ReceiveMode { get; } + public Task ReceiveAsync() => throw new NotImplementedException(); + + public Task ReceiveAsync(TimeSpan operationTimeout) => throw new NotImplementedException(); + + public Task> ReceiveAsync(int maxMessageCount) => throw new NotImplementedException(); + + public Task> ReceiveAsync(int maxMessageCount, TimeSpan operationTimeout) => throw new NotImplementedException(); + + public Task ReceiveDeferredMessageAsync(long sequenceNumber) => throw new NotImplementedException(); + + public Task> ReceiveDeferredMessageAsync(IEnumerable sequenceNumbers) => throw new NotImplementedException(); + + public Task CompleteAsync(IEnumerable lockTokens) => throw new NotImplementedException(); + + public Task DeferAsync(string lockToken, IDictionary propertiesToModify = null) => throw new NotImplementedException(); + + public Task RenewLockAsync(Message message) => throw new NotImplementedException(); + + public Task RenewLockAsync(string lockToken) => throw new NotImplementedException(); + + public Task PeekAsync() => throw new NotImplementedException(); + + public Task> PeekAsync(int maxMessageCount) => throw new NotImplementedException(); + + public Task PeekBySequenceNumberAsync(long fromSequenceNumber) => throw new NotImplementedException(); + + public Task> PeekBySequenceNumberAsync(long fromSequenceNumber, int messageCount) => throw new NotImplementedException(); + + public long LastPeekedSequenceNumber { get; } + } } } \ No newline at end of file diff --git a/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs b/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs index e221ce56..1ff58dce 100644 --- a/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs +++ b/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs @@ -62,6 +62,17 @@ Message GenerateMessage() var message = new Message(bytes); message.UserProperties["NServiceBus.EnclosedMessageTypes"] = "Testing.Handlers.DummyMessage"; + var systemProperties = new Message.SystemPropertiesCollection(); + // sequence number is required to prevent SystemPropertiesCollection from throwing on the getters + var fieldInfo = typeof(Message.SystemPropertiesCollection).GetField("sequenceNumber", BindingFlags.NonPublic | BindingFlags.Instance); + fieldInfo.SetValue(systemProperties, 123); + // set delivery count to 1 + var deliveryCountProperty = typeof(Message.SystemPropertiesCollection).GetProperty("DeliveryCount"); + deliveryCountProperty.SetValue(systemProperties, 1); + // assign test message mocked system properties + var property = typeof(Message).GetProperty("SystemProperties"); + property.SetValue(message, systemProperties); + return message; } }