diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/AttributeDiscoverer.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/AttributeDiscoverer.cs deleted file mode 100644 index 1a1434c7..00000000 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/AttributeDiscoverer.cs +++ /dev/null @@ -1,38 +0,0 @@ -namespace NServiceBus.AzureFunctions.InProcess.ServiceBus -{ - using System; - using System.Diagnostics; - using System.Reflection; - using Microsoft.Azure.WebJobs; - - static class TriggerDiscoverer - { - /// - /// Attempts to derive the required configuration information from the Azure Function and trigger attributes via reflection. - /// - public static TTransportTriggerAttribute TryGet() where TTransportTriggerAttribute : Attribute - { - var frames = new StackTrace().GetFrames(); - foreach (var stackFrame in frames) - { - var method = stackFrame.GetMethod(); - var functionAttribute = method.GetCustomAttribute(false); - if (functionAttribute != null) - { - foreach (var parameter in method.GetParameters()) - { - var triggerConfiguration = parameter.GetCustomAttribute(false); - if (triggerConfiguration != null) - { - return triggerConfiguration; - } - } - - return null; - } - } - - return null; - } - } -} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/FodyWeavers.xml b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/FodyWeavers.xml new file mode 100644 index 00000000..a988537c --- /dev/null +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/FodyWeavers.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/FunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/FunctionEndpoint.cs index cae7cb57..bcb5321a 100644 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/FunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/FunctionEndpoint.cs @@ -10,6 +10,8 @@ using Extensibility; using Logging; using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.ServiceBus.Core; + using Microsoft.Azure.WebJobs; using Microsoft.Extensions.Logging; using Transport; using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext; @@ -28,54 +30,118 @@ internal FunctionEndpoint(IStartableEndpointWithExternallyManagedContainer exter endpointFactory = _ => externallyManagedContainerEndpoint.Start(serviceProvider); } + /// + /// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. This method will lookup the setting to determine whether to use transactional or non-transactional processing. + /// + Task IFunctionEndpoint.Process(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger) => + ReflectionHelper.GetAutoCompleteValue() + ? ProcessNonTransactional(message, executionContext, messageReceiver, functionsLogger) + : ProcessTransactional(message, executionContext, messageReceiver, functionsLogger); + + /// + /// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. All messages are committed transactionally with the successful processing of the incoming message. + /// Requires to be set to false! + /// + public async Task ProcessTransactional(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger = null) + { + FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); + + var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger); + + try + { + await InitializeEndpointIfNecessary(functionExecutionContext, CancellationToken.None) + .ConfigureAwait(false); + + await Process(message, new MessageReceiverTransactionStrategy(message, messageReceiver), pipeline) + .ConfigureAwait(false); + } + catch (Exception) + { + // abandon message outside of a transaction scope to ensure the abandon operation can't be rolled back + await messageReceiver.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false); + throw; + } + } + + /// + /// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. + /// + public Task ProcessNonTransactional(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger = null) => Process(message, executionContext, functionsLogger); + /// /// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. /// + [ObsoleteEx( + ReplacementTypeOrMember = "Process(Message, ExecutionContext, IMessageReceiver, ILogger)", + TreatAsErrorFromVersion = "2", + RemoveInVersion = "3")] public async Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null) { FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); - var messageContext = CreateMessageContext(message); var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger); - await InitializeEndpointIfNecessary(functionExecutionContext, - messageContext.ReceiveCancellationTokenSource.Token).ConfigureAwait(false); + await InitializeEndpointIfNecessary(functionExecutionContext, CancellationToken.None) + .ConfigureAwait(false); + + await Process(message, NoTransactionStrategy.Instance, pipeline) + .ConfigureAwait(false); + } + + internal static async Task Process(Message message, ITransactionStrategy transactionStrategy, PipelineInvoker pipeline) + { + var messageId = message.GetMessageId(); try { - await pipeline.PushMessage(messageContext).ConfigureAwait(false); + using (var transaction = transactionStrategy.CreateTransaction()) + { + var transportTransaction = transactionStrategy.CreateTransportTransaction(transaction); + var messageContext = CreateMessageContext(transportTransaction); + + await pipeline.PushMessage(messageContext).ConfigureAwait(false); + + await transactionStrategy.Complete(transaction).ConfigureAwait(false); + + transaction?.Commit(); + } } catch (Exception exception) { - var errorContext = new ErrorContext( - exception, - message.GetHeaders(), - messageContext.MessageId, - messageContext.Body, - new TransportTransaction(), - message.SystemProperties.DeliveryCount); + using (var transaction = transactionStrategy.CreateTransaction()) + { + var transportTransaction = transactionStrategy.CreateTransportTransaction(transaction); + var errorContext = new ErrorContext( + exception, + message.GetHeaders(), + messageId, + message.Body, + transportTransaction, + message.SystemProperties.DeliveryCount); + + var errorHandleResult = await pipeline.PushFailedMessage(errorContext).ConfigureAwait(false); + + if (errorHandleResult == ErrorHandleResult.Handled) + { + await transactionStrategy.Complete(transaction).ConfigureAwait(false); - var errorHandleResult = await pipeline.PushFailedMessage(errorContext).ConfigureAwait(false); + transaction?.Commit(); + return; + } - if (errorHandleResult == ErrorHandleResult.Handled) - { - // return to signal to the Functions host it can complete the incoming message - return; + throw; } - - throw; } - MessageContext CreateMessageContext(Message originalMessage) - { - return new MessageContext( - originalMessage.GetMessageId(), - originalMessage.GetHeaders(), - originalMessage.Body, - new TransportTransaction(), + MessageContext CreateMessageContext(TransportTransaction transportTransaction) => + new MessageContext( + messageId, + message.GetHeaders(), + message.Body, + transportTransaction, new CancellationTokenSource(), new ContextBag()); - } } /// diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/IFunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/IFunctionEndpoint.cs index b9be6404..5b7de981 100644 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/IFunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/IFunctionEndpoint.cs @@ -3,6 +3,7 @@ using System; using System.Threading.Tasks; using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.ServiceBus.Core; using Microsoft.Azure.WebJobs; using Microsoft.Extensions.Logging; @@ -12,9 +13,18 @@ /// public interface IFunctionEndpoint { + /// + /// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. This method will lookup the setting to determine whether to use transactional or non-transactional processing. + /// + Task Process(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger = null); + /// /// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. /// + [ObsoleteEx( + ReplacementTypeOrMember = "Process(Message, ExecutionContext, IMessageReceiver, ILogger)", + TreatAsErrorFromVersion = "2", + RemoveInVersion = "3")] Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null); /// diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBus.AzureFunctions.InProcess.ServiceBus.csproj b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBus.AzureFunctions.InProcess.ServiceBus.csproj index 6e3203e5..9359d294 100644 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBus.AzureFunctions.InProcess.ServiceBus.csproj +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBus.AzureFunctions.InProcess.ServiceBus.csproj @@ -16,6 +16,7 @@ + diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBusEndpointNameAttribute.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBusEndpointNameAttribute.cs index 37b15050..ddad5e57 100644 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBusEndpointNameAttribute.cs +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBusEndpointNameAttribute.cs @@ -4,6 +4,10 @@ /// Assembly attribute to specify NServiceBus logical endpoint name. /// This name is used to wire up an auto-generated service bus trigger function, responding to messages in the queue specified by the name provided. /// + [ObsoleteEx( + ReplacementTypeOrMember = nameof(NServiceBusTriggerFunctionAttribute), + TreatAsErrorFromVersion = "2", + RemoveInVersion = "3")] [System.AttributeUsage(System.AttributeTargets.Assembly)] public sealed class NServiceBusEndpointNameAttribute : System.Attribute { diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBusTriggerFunctionAttribute.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBusTriggerFunctionAttribute.cs new file mode 100644 index 00000000..21a555fb --- /dev/null +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBusTriggerFunctionAttribute.cs @@ -0,0 +1,34 @@ +namespace NServiceBus +{ + /// + /// Assembly attribute to configure generated NServiceBus Azure Function. + /// The attribute is used to wire up an auto-generated Service Bus trigger function, responding to messages in the queue specified by the name provided. + /// + [System.AttributeUsage(System.AttributeTargets.Assembly)] + public sealed class NServiceBusTriggerFunctionAttribute : System.Attribute + { + /// + /// Endpoint name that is the input queue name. + /// + public string EndpointName { get; } + + /// + /// Override trigger function name. + /// + public string TriggerFunctionName { get; set; } + + /// + /// Enable cross-entity transactions. + /// + public bool EnableCrossEntityTransactions { get; set; } + + /// + /// Endpoint logical name. + /// + /// Endpoint name that is the input queue name. + public NServiceBusTriggerFunctionAttribute(string endpointName) + { + EndpointName = endpointName; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/ReflectionHelper.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/ReflectionHelper.cs new file mode 100644 index 00000000..21d7bd08 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/ReflectionHelper.cs @@ -0,0 +1,50 @@ +namespace NServiceBus.AzureFunctions.InProcess.ServiceBus +{ + using System; + using System.Diagnostics; + using System.Reflection; + using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.WebJobs; + + class ReflectionHelper + { + public static bool GetAutoCompleteValue() + { + var triggerAttribute = FindTriggerAttributeInternal(); + if (triggerAttribute != null) + { + return triggerAttribute.AutoComplete; + } + + throw new Exception($"Could not locate {nameof(ServiceBusTriggerAttribute)} to infer the AutoComplete setting. Make sure that the function trigger contains a parameter decorated with {nameof(ServiceBusTriggerAttribute)} or use the advanced APIs exposed via the {nameof(FunctionEndpoint)} type instead."); + } + + public static ServiceBusTriggerAttribute FindBusTriggerAttribute() => FindTriggerAttributeInternal(); + + static ServiceBusTriggerAttribute FindTriggerAttributeInternal() + { + var st = new StackTrace(skipFrames: 2); // skip first two frames because it is this method + the public method + var frames = st.GetFrames(); + foreach (var frame in frames) + { + var method = frame?.GetMethod(); + if (method?.GetCustomAttribute(false) != null) + { + foreach (var parameter in method.GetParameters()) + { + ServiceBusTriggerAttribute serviceBusTriggerAttribute; + if (parameter.ParameterType == typeof(Message) + && (serviceBusTriggerAttribute = parameter.GetCustomAttribute(false)) != null) + { + return serviceBusTriggerAttribute; + } + } + + return null; + } + } + + return null; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs index fb022432..fe605334 100644 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs @@ -4,7 +4,6 @@ using System.Threading.Tasks; using AzureFunctions.InProcess.ServiceBus; using Logging; - using Microsoft.Azure.WebJobs; using Microsoft.Extensions.Configuration; using Serialization; using Transport; @@ -134,10 +133,10 @@ static string GetConfiguredValueOrFallback(IConfiguration configuration, string /// public static ServiceBusTriggeredEndpointConfiguration FromAttributes() { - var configuration = TriggerDiscoverer.TryGet(); - if (configuration != null) + var serviceBusTriggerAttribute = ReflectionHelper.FindBusTriggerAttribute(); + if (serviceBusTriggerAttribute != null) { - return new ServiceBusTriggeredEndpointConfiguration(configuration.QueueName, configuration.Connection); + return new ServiceBusTriggeredEndpointConfiguration(serviceBusTriggerAttribute.QueueName, serviceBusTriggerAttribute.Connection); } throw new Exception( diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Transactions/ITransactionStrategy.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Transactions/ITransactionStrategy.cs new file mode 100644 index 00000000..771abeb1 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Transactions/ITransactionStrategy.cs @@ -0,0 +1,13 @@ +namespace NServiceBus +{ + using System.Threading.Tasks; + using System.Transactions; + using Transport; + + interface ITransactionStrategy + { + CommittableTransaction CreateTransaction(); + TransportTransaction CreateTransportTransaction(CommittableTransaction transaction); + Task Complete(CommittableTransaction transaction); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Transactions/MessageReceiverTransactionStrategy.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Transactions/MessageReceiverTransactionStrategy.cs new file mode 100644 index 00000000..b5beb92d --- /dev/null +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Transactions/MessageReceiverTransactionStrategy.cs @@ -0,0 +1,47 @@ +namespace NServiceBus +{ + using System.Threading.Tasks; + using System.Transactions; + using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.ServiceBus.Core; + using Transport; + + class MessageReceiverTransactionStrategy : ITransactionStrategy + { + readonly Message message; + readonly IMessageReceiver messageReceiver; + + public MessageReceiverTransactionStrategy(Message message, IMessageReceiver messageReceiver) + { + this.message = message; + this.messageReceiver = messageReceiver; + } + + public CommittableTransaction CreateTransaction() => + new CommittableTransaction(new TransactionOptions + { + IsolationLevel = IsolationLevel.Serializable, + Timeout = TransactionManager.MaximumTimeout + }); + + public TransportTransaction CreateTransportTransaction(CommittableTransaction transaction) + { + var transportTransaction = new TransportTransaction(); + transportTransaction.Set((messageReceiver.ServiceBusConnection, messageReceiver.Path)); + transportTransaction.Set("IncomingQueue.PartitionKey", message.PartitionKey); + transportTransaction.Set(transaction); + return transportTransaction; + } + + public async Task Complete(CommittableTransaction transaction) + { + // open short-lived TransactionScope connected to the committable transaction to ensure the message operation has a scope to enlist. + using (var scope = new TransactionScope(transaction, TransactionScopeAsyncFlowOption.Enabled)) + { + await messageReceiver.CompleteAsync(message.SystemProperties.LockToken) + .ConfigureAwait(false); + scope.Complete(); + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Transactions/NoTransactionStrategy.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Transactions/NoTransactionStrategy.cs new file mode 100644 index 00000000..b2c34f04 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Transactions/NoTransactionStrategy.cs @@ -0,0 +1,18 @@ +namespace NServiceBus +{ + using System.Threading.Tasks; + using System.Transactions; + using Transport; + + class NoTransactionStrategy : ITransactionStrategy + { + public virtual CommittableTransaction CreateTransaction() => null; + + public virtual TransportTransaction CreateTransportTransaction(CommittableTransaction transaction) => + new TransportTransaction(); + + public virtual Task Complete(CommittableTransaction transaction) => Task.CompletedTask; + + public static NoTransactionStrategy Instance { get; } = new NoTransactionStrategy(); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Can_enable_transactions.approved.txt b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Can_enable_transactions.approved.txt new file mode 100644 index 00000000..4c8ea9d2 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Can_enable_transactions.approved.txt @@ -0,0 +1,28 @@ +// +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Core; +using Microsoft.Azure.WebJobs; +using Microsoft.Extensions.Logging; +using System.Threading.Tasks; +using NServiceBus; + +class FunctionEndpointTrigger +{ + readonly FunctionEndpoint endpoint; + + public FunctionEndpointTrigger(FunctionEndpoint endpoint) + { + this.endpoint = endpoint; + } + + [FunctionName("NServiceBusFunctionEndpointTrigger-endpoint")] + public async Task Run( + [ServiceBusTrigger(queueName: "endpoint", AutoComplete = false)] + Message message, + MessageReceiver messageReceiver, + ILogger logger, + ExecutionContext executionContext) + { + await endpoint.ProcessTransactional(message, executionContext, messageReceiver, logger); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Can_override_trigger_function_name.approved.txt b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Can_override_trigger_function_name.approved.txt new file mode 100644 index 00000000..fd792927 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Can_override_trigger_function_name.approved.txt @@ -0,0 +1,28 @@ +// +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Core; +using Microsoft.Azure.WebJobs; +using Microsoft.Extensions.Logging; +using System.Threading.Tasks; +using NServiceBus; + +class FunctionEndpointTrigger +{ + readonly FunctionEndpoint endpoint; + + public FunctionEndpointTrigger(FunctionEndpoint endpoint) + { + this.endpoint = endpoint; + } + + [FunctionName("trigger")] + public async Task Run( + [ServiceBusTrigger(queueName: "endpoint", AutoComplete = true)] + Message message, + MessageReceiver messageReceiver, + ILogger logger, + ExecutionContext executionContext) + { + await endpoint.ProcessNonTransactional(message, executionContext, messageReceiver, logger); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.NameIsStringValue.approved.txt b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.NameIsStringValue.approved.txt new file mode 100644 index 00000000..e16a7f0f --- /dev/null +++ b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.NameIsStringValue.approved.txt @@ -0,0 +1,28 @@ +// +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Core; +using Microsoft.Azure.WebJobs; +using Microsoft.Extensions.Logging; +using System.Threading.Tasks; +using NServiceBus; + +class FunctionEndpointTrigger +{ + readonly FunctionEndpoint endpoint; + + public FunctionEndpointTrigger(FunctionEndpoint endpoint) + { + this.endpoint = endpoint; + } + + [FunctionName("NServiceBusFunctionEndpointTrigger-endpoint")] + public async Task Run( + [ServiceBusTrigger(queueName: "endpoint", AutoComplete = true)] + Message message, + MessageReceiver messageReceiver, + ILogger logger, + ExecutionContext executionContext) + { + await endpoint.ProcessNonTransactional(message, executionContext, messageReceiver, logger); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.No_attribute_should_not_generate_trigger_function.approved.txt b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.No_attribute_should_not_generate_trigger_function.approved.txt new file mode 100644 index 00000000..e69de29b diff --git a/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Two_optionals_out_of_order.approved.txt b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Two_optionals_out_of_order.approved.txt new file mode 100644 index 00000000..70cc6c8a --- /dev/null +++ b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Two_optionals_out_of_order.approved.txt @@ -0,0 +1,28 @@ +// +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Core; +using Microsoft.Azure.WebJobs; +using Microsoft.Extensions.Logging; +using System.Threading.Tasks; +using NServiceBus; + +class FunctionEndpointTrigger +{ + readonly FunctionEndpoint endpoint; + + public FunctionEndpointTrigger(FunctionEndpoint endpoint) + { + this.endpoint = endpoint; + } + + [FunctionName("trigger")] + public async Task Run( + [ServiceBusTrigger(queueName: "endpoint", AutoComplete = false)] + Message message, + MessageReceiver messageReceiver, + ILogger logger, + ExecutionContext executionContext) + { + await endpoint.ProcessTransactional(message, executionContext, messageReceiver, logger); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Use_two_optionals.approved.txt b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Use_two_optionals.approved.txt new file mode 100644 index 00000000..70cc6c8a --- /dev/null +++ b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.Use_two_optionals.approved.txt @@ -0,0 +1,28 @@ +// +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Core; +using Microsoft.Azure.WebJobs; +using Microsoft.Extensions.Logging; +using System.Threading.Tasks; +using NServiceBus; + +class FunctionEndpointTrigger +{ + readonly FunctionEndpoint endpoint; + + public FunctionEndpointTrigger(FunctionEndpoint endpoint) + { + this.endpoint = endpoint; + } + + [FunctionName("trigger")] + public async Task Run( + [ServiceBusTrigger(queueName: "endpoint", AutoComplete = false)] + Message message, + MessageReceiver messageReceiver, + ILogger logger, + ExecutionContext executionContext) + { + await endpoint.ProcessTransactional(message, executionContext, messageReceiver, logger); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.UsingFullyQualifiedAttributeName.approved.txt b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.UsingFullyQualifiedAttributeName.approved.txt new file mode 100644 index 00000000..e16a7f0f --- /dev/null +++ b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.UsingFullyQualifiedAttributeName.approved.txt @@ -0,0 +1,28 @@ +// +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Core; +using Microsoft.Azure.WebJobs; +using Microsoft.Extensions.Logging; +using System.Threading.Tasks; +using NServiceBus; + +class FunctionEndpointTrigger +{ + readonly FunctionEndpoint endpoint; + + public FunctionEndpointTrigger(FunctionEndpoint endpoint) + { + this.endpoint = endpoint; + } + + [FunctionName("NServiceBusFunctionEndpointTrigger-endpoint")] + public async Task Run( + [ServiceBusTrigger(queueName: "endpoint", AutoComplete = true)] + Message message, + MessageReceiver messageReceiver, + ILogger logger, + ExecutionContext executionContext) + { + await endpoint.ProcessNonTransactional(message, executionContext, messageReceiver, logger); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.UsingNamespace.approved.txt b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.UsingNamespace.approved.txt new file mode 100644 index 00000000..e16a7f0f --- /dev/null +++ b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/ApprovalFiles/SourceGeneratorApprovals2.UsingNamespace.approved.txt @@ -0,0 +1,28 @@ +// +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Core; +using Microsoft.Azure.WebJobs; +using Microsoft.Extensions.Logging; +using System.Threading.Tasks; +using NServiceBus; + +class FunctionEndpointTrigger +{ + readonly FunctionEndpoint endpoint; + + public FunctionEndpointTrigger(FunctionEndpoint endpoint) + { + this.endpoint = endpoint; + } + + [FunctionName("NServiceBusFunctionEndpointTrigger-endpoint")] + public async Task Run( + [ServiceBusTrigger(queueName: "endpoint", AutoComplete = true)] + Message message, + MessageReceiver messageReceiver, + ILogger logger, + ExecutionContext executionContext) + { + await endpoint.ProcessNonTransactional(message, executionContext, messageReceiver, logger); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/SourceGeneratorApproval2.cs b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/SourceGeneratorApproval2.cs new file mode 100644 index 00000000..9217ceae --- /dev/null +++ b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/SourceGeneratorApproval2.cs @@ -0,0 +1,238 @@ +namespace NServiceBus.AzureFunctions.SourceGenerator.Tests +{ + using System; + using System.Collections.Generic; + using System.Collections.Immutable; + using System.Linq; + using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.WebJobs; + using Microsoft.CodeAnalysis; + using Microsoft.CodeAnalysis.CSharp; + using Microsoft.Extensions.Logging; + using NUnit.Framework; + using Particular.Approvals; + + [TestFixture] + public class SourceGeneratorApprovals2 + { + [Test] + public void UsingNamespace() + { + var source = +@"using NServiceBus; + +[assembly: NServiceBusTriggerFunction(Foo.Startup.EndpointName)] + +namespace Foo +{ + public class Startup + { + public const string EndpointName = ""endpoint""; + } +}"; + var (output, _) = GetGeneratedOutput(source); + + Approver.Verify(output); + } + + [Test] + public void UsingFullyQualifiedAttributeName() + { + var source = +@"[assembly: NServiceBus.NServiceBusTriggerFunction(Foo.Startup.EndpointName)] + +namespace Foo +{ + public class Startup + { + public const string EndpointName = ""endpoint""; + } +}"; + var (output, _) = GetGeneratedOutput(source); + + Approver.Verify(output); + } + + [Test] + public void NameIsStringValue() + { + var source = @"[assembly: NServiceBus.NServiceBusTriggerFunction(""endpoint"")]"; + var (output, _) = GetGeneratedOutput(source); + + Approver.Verify(output); + } + + [Test] + public void No_attribute_should_not_generate_trigger_function() + { + var source = @""; + var (output, _) = GetGeneratedOutput(source); + + Approver.Verify(output); + } + + [Test] + public void No_attribute_should_not_generate_compilation_error() + { + var source = @"using NServiceBus;"; + var (_, diagnostics) = GetGeneratedOutput(source); + + Assert.False(diagnostics.Any(d => d.Severity == DiagnosticSeverity.Error)); + } + + [Test] + public void Can_override_trigger_function_name() + { + var source = + @"using NServiceBus; + +[assembly: NServiceBusTriggerFunction(""endpoint"", TriggerFunctionName = ""trigger"")] + +public class Startup +{ +}"; + var (output, _) = GetGeneratedOutput(source); + + Approver.Verify(output); + } + + [TestCase(null)] + [TestCase("")] + [TestCase(" ")] + public void Invalid_name_should_cause_an_error(string endpointName) + { + var source = @" +using NServiceBus; + +[assembly: NServiceBusTriggerFunction(""" + endpointName + @""")] +"; + var (_, diagnostics) = GetGeneratedOutput(source, suppressGeneratedDiagnosticsErrors: true); + + Assert.True(diagnostics.Any(d => d.Severity == DiagnosticSeverity.Error && d.Id == TriggerFunctionGenerator2.InvalidEndpointNameError.Id)); + } + + [TestCase(null)] + [TestCase("")] + [TestCase(" ")] + public void Invalid_trigger_function_name_should_cause_an_error(string triggerFunctionName) + { + var source = @" +using NServiceBus; + +[assembly: NServiceBusTriggerFunction(""endpoint"", TriggerFunctionName = """ + triggerFunctionName + @""")] +"; + var (_, diagnostics) = GetGeneratedOutput(source, suppressGeneratedDiagnosticsErrors: true); + + Assert.True(diagnostics.Any(d => d.Severity == DiagnosticSeverity.Error && d.Id == TriggerFunctionGenerator2.InvalidTriggerFunctionNameError.Id)); + } + + [Test] + public void Can_enable_transactions() + { + var source = @" +using NServiceBus; + +[assembly: NServiceBusTriggerFunction(""endpoint"", EnableCrossEntityTransactions = true)] + +public class Startup +{ +}"; + var (output, _) = GetGeneratedOutput(source); + + Approver.Verify(output); + } + + [Test] + public void Use_two_optionals() + { + var source = @" +using NServiceBus; + +[assembly: NServiceBusTriggerFunction(""endpoint"", TriggerFunctionName = ""trigger"", EnableCrossEntityTransactions = true)] + +public class Startup +{ +}"; + var (output, _) = GetGeneratedOutput(source); + + Approver.Verify(output); + } + + [Test] + public void Two_optionals_out_of_order() + { + var source = @" +using NServiceBus; + +[assembly: NServiceBusTriggerFunction(""endpoint"", EnableCrossEntityTransactions = true, TriggerFunctionName = ""trigger"")] + +public class Startup +{ +}"; + var (output, _) = GetGeneratedOutput(source); + + Approver.Verify(output); + } + + [OneTimeSetUp] + public void Init() + { + // For the unit tests to work, the compilation used by the source generator needs to know that NServiceBusTriggerFunction + // is an attribute from NServiceBus namespace and its full name is NServiceBus.NServiceBusTriggerFunctionAttribute. + // By referencing NServiceBusTriggerFunctionAttribute here, NServiceBus.AzureFunctions.InProcess.ServiceBus is forced to load and participate in the compilation. + _ = new NServiceBusTriggerFunctionAttribute(endpointName: "test"); + } + + static (string output, ImmutableArray diagnostics) GetGeneratedOutput(string source, bool suppressGeneratedDiagnosticsErrors = false) + { + var syntaxTree = CSharpSyntaxTree.ParseText(source); + var references = new List(); + var assemblies = AppDomain.CurrentDomain.GetAssemblies(); + + foreach (var assembly in assemblies) + { + if (!assembly.IsDynamic && !string.IsNullOrWhiteSpace(assembly.Location)) + { + references.Add(MetadataReference.CreateFromFile(assembly.Location)); + } + } + + var compilation = Compile(new[] + { + syntaxTree + }, references); + + var generator = new TriggerFunctionGenerator2(); + + var driver = CSharpGeneratorDriver.Create(generator); + driver.RunGeneratorsAndUpdateCompilation(compilation, out var outputCompilation, out var generateDiagnostics); + + // add necessary references for the generated trigger + references.Add(MetadataReference.CreateFromFile(typeof(ServiceBusTriggerAttribute).Assembly.Location)); + references.Add(MetadataReference.CreateFromFile(typeof(ExecutionContext).Assembly.Location)); + references.Add(MetadataReference.CreateFromFile(typeof(Message).Assembly.Location)); + references.Add(MetadataReference.CreateFromFile(typeof(ILogger).Assembly.Location)); + Compile(outputCompilation.SyntaxTrees, references); + + if (!suppressGeneratedDiagnosticsErrors) + { + Assert.False(generateDiagnostics.Any(d => d.Severity == DiagnosticSeverity.Error), "Failed: " + generateDiagnostics.FirstOrDefault()?.GetMessage()); + } + + return (outputCompilation.SyntaxTrees.Last().ToString(), generateDiagnostics); + } + + static CSharpCompilation Compile(IEnumerable syntaxTrees, IEnumerable references) + { + var compilation = CSharpCompilation.Create("result", syntaxTrees, references, new CSharpCompilationOptions(OutputKind.DynamicallyLinkedLibrary)); + + // Verify the code compiled: + var compilationErrors = compilation + .GetDiagnostics() + .Where(d => d.Severity >= DiagnosticSeverity.Warning); + Assert.IsEmpty(compilationErrors, compilationErrors.FirstOrDefault()?.GetMessage()); + + return compilation; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/SourceGeneratorApprovals.cs b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/SourceGeneratorApprovals.cs index 46f1b7b1..193aac7f 100644 --- a/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/SourceGeneratorApprovals.cs +++ b/src/NServiceBus.AzureFunctions.SourceGenerator.Tests/SourceGeneratorApprovals.cs @@ -1,4 +1,6 @@ -namespace NServiceBus.AzureFunctions.SourceGenerator.Tests +#pragma warning disable IDE0079 // Remove unnecessary suppression +#pragma warning disable CS0618 // Disable obsolete warning for NServiceBusEndpointNameAttribute +namespace NServiceBus.AzureFunctions.SourceGenerator.Tests { using System; using System.Collections.Generic; @@ -166,4 +168,6 @@ public void Init() return (outputCompilation.SyntaxTrees.Last().ToString(), generateDiagnostics); } } -} \ No newline at end of file +} +#pragma warning restore CS0618 +#pragma warning restore IDE0079 // Remove unnecessary suppression \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.SourceGenerator/TriggerFunctionGenerator2.cs b/src/NServiceBus.AzureFunctions.SourceGenerator/TriggerFunctionGenerator2.cs new file mode 100644 index 00000000..bd1aa068 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.SourceGenerator/TriggerFunctionGenerator2.cs @@ -0,0 +1,142 @@ +namespace NServiceBus.AzureFunctions.SourceGenerator +{ + using System.Text; + using Microsoft.CodeAnalysis; + using Microsoft.CodeAnalysis.CSharp.Syntax; + using Microsoft.CodeAnalysis.Text; + + [Generator] + public class TriggerFunctionGenerator2 : ISourceGenerator + { + internal static readonly DiagnosticDescriptor InvalidEndpointNameError = new DiagnosticDescriptor(id: "NSBFUNC003", + title: "Invalid Endpoint Name", + messageFormat: "Endpoint name is invalid and cannot be used to generate trigger function", + category: "TriggerFunctionGenerator", + DiagnosticSeverity.Error, + isEnabledByDefault: true); + + internal static readonly DiagnosticDescriptor InvalidTriggerFunctionNameError = new DiagnosticDescriptor(id: "NSBFUNC004", + title: "Invalid Trigger Function Name", + messageFormat: "Trigger function name is invalid and cannot be used to generate trigger function", + category: "TriggerFunctionGenerator", + DiagnosticSeverity.Error, + isEnabledByDefault: true); + + public void Initialize(GeneratorInitializationContext context) + { + context.RegisterForSyntaxNotifications(() => new SyntaxReceiver()); + } + + class SyntaxReceiver : ISyntaxContextReceiver + { + internal string endpointName; + internal string triggerFunctionName; + internal bool enableCrossEntityTransactions; + internal bool attributeFound; + + public void OnVisitSyntaxNode(GeneratorSyntaxContext context) + { + if (context.Node is AttributeSyntax attributeSyntax + && IsNServiceBusEndpointNameAttribute(context.SemanticModel.GetTypeInfo(attributeSyntax).Type?.ToDisplayString())) + { + attributeFound = true; + + // Assign guaranteed endpoint/queue name and handle the defaults + endpointName = AttributeParameterAtPosition(0); + triggerFunctionName = $"NServiceBusFunctionEndpointTrigger-{endpointName}"; + enableCrossEntityTransactions = false; + + var attributeParametersCount = AttributeParametersCount(); + + if (attributeParametersCount == 1) + { + return; + } + + if (bool.TryParse(AttributeParameterAtPosition(1), out enableCrossEntityTransactions)) + { + // 2nd parameter was cross entity transaction flag + // 3rd parameter might be trigger function name + triggerFunctionName = attributeParametersCount == 3 + ? AttributeParameterAtPosition(2) + : triggerFunctionName; + } + else + { + // 2nd parameter was triggerFunctionName + triggerFunctionName = AttributeParameterAtPosition(1); + + // 3rd parameter might be cross entity transaction flag + enableCrossEntityTransactions = attributeParametersCount == 3 && bool.Parse(AttributeParameterAtPosition(2)); + } + } + + bool IsNServiceBusEndpointNameAttribute(string value) => value?.Equals("NServiceBus.NServiceBusTriggerFunctionAttribute") ?? false; + string AttributeParameterAtPosition(int position) => context.SemanticModel.GetConstantValue(attributeSyntax.ArgumentList.Arguments[position].Expression).ToString(); + int AttributeParametersCount() => attributeSyntax.ArgumentList.Arguments.Count; + } + } + + public void Execute(GeneratorExecutionContext context) + { + // Short circuit if this is a different syntax receiver + if (!(context.SyntaxContextReceiver is SyntaxReceiver syntaxReceiver)) + { + return; + } + + // Skip processing if no attribute was found + if (!syntaxReceiver.attributeFound) + { + return; + } + + // Generate an error if empty/null/space is used as endpoint name + if (string.IsNullOrWhiteSpace(syntaxReceiver.endpointName)) + { + context.ReportDiagnostic(Diagnostic.Create(InvalidEndpointNameError, Location.None, syntaxReceiver.endpointName)); + return; + } + + // Generate an error if empty/null/space is used as trigger function name + if (string.IsNullOrWhiteSpace(syntaxReceiver.triggerFunctionName)) + { + context.ReportDiagnostic(Diagnostic.Create(InvalidTriggerFunctionNameError, Location.None, syntaxReceiver.triggerFunctionName)); + return; + } + + var source = +$@"// +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Core; +using Microsoft.Azure.WebJobs; +using Microsoft.Extensions.Logging; +using System.Threading.Tasks; +using NServiceBus; + +class FunctionEndpointTrigger +{{ + readonly FunctionEndpoint endpoint; + + public FunctionEndpointTrigger(FunctionEndpoint endpoint) + {{ + this.endpoint = endpoint; + }} + + [FunctionName(""{syntaxReceiver.triggerFunctionName}"")] + public async Task Run( + [ServiceBusTrigger(queueName: ""{syntaxReceiver.endpointName}"", AutoComplete = {(!syntaxReceiver.enableCrossEntityTransactions).ToString().ToLowerInvariant()})] + Message message, + MessageReceiver messageReceiver, + ILogger logger, + ExecutionContext executionContext) + {{ + {(syntaxReceiver.enableCrossEntityTransactions + ? "await endpoint.ProcessTransactional(message, executionContext, messageReceiver, logger);" + : "await endpoint.ProcessNonTransactional(message, executionContext, messageReceiver, logger);")} + }} +}}"; + context.AddSource("NServiceBus__FunctionEndpointTrigger", SourceText.From(source, Encoding.UTF8)); + } + } +} \ No newline at end of file diff --git a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index a7f59708..c5fb78a3 100644 --- a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -3,7 +3,11 @@ namespace NServiceBus { public class FunctionEndpoint : NServiceBus.IFunctionEndpoint { + [System.Obsolete("Use `Process(Message, ExecutionContext, IMessageReceiver, ILogger)` instead. Will" + + " be treated as an error from version 2.0.0. Will be removed in version 3.0.0.", false)] public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task ProcessNonTransactional(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Azure.ServiceBus.Core.IMessageReceiver messageReceiver, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task ProcessTransactional(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Azure.ServiceBus.Core.IMessageReceiver messageReceiver, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } public System.Threading.Tasks.Task Publish(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } public System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } public System.Threading.Tasks.Task Publish(System.Action messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } @@ -31,7 +35,10 @@ namespace NServiceBus } public interface IFunctionEndpoint { + [System.Obsolete("Use `Process(Message, ExecutionContext, IMessageReceiver, ILogger)` instead. Will" + + " be treated as an error from version 2.0.0. Will be removed in version 3.0.0.", false)] System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Azure.ServiceBus.Core.IMessageReceiver messageReceiver, Microsoft.Extensions.Logging.ILogger functionsLogger = null); System.Threading.Tasks.Task Publish(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); System.Threading.Tasks.Task Publish(System.Action messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); @@ -46,6 +53,8 @@ namespace NServiceBus System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); } [System.AttributeUsage(System.AttributeTargets.Assembly | System.AttributeTargets.All)] + [System.Obsolete("Use `NServiceBusTriggerFunctionAttribute` instead. Will be treated as an error fr" + + "om version 2.0.0. Will be removed in version 3.0.0.", false)] public sealed class NServiceBusEndpointNameAttribute : System.Attribute { public NServiceBusEndpointNameAttribute(string name) { } @@ -53,6 +62,14 @@ namespace NServiceBus public string Name { get; } public string TriggerFunctionName { get; } } + [System.AttributeUsage(System.AttributeTargets.Assembly | System.AttributeTargets.All)] + public sealed class NServiceBusTriggerFunctionAttribute : System.Attribute + { + public NServiceBusTriggerFunctionAttribute(string endpointName) { } + public bool EnableCrossEntityTransactions { get; set; } + public string EndpointName { get; } + public string TriggerFunctionName { get; set; } + } public class ServiceBusTriggeredEndpointConfiguration { public ServiceBusTriggeredEndpointConfiguration(Microsoft.Extensions.Configuration.IConfiguration configuration) { } diff --git a/src/ServiceBus.Tests/FunctionEndpointComponent.cs b/src/ServiceBus.Tests/FunctionEndpointComponent.cs index 85872d14..a5b5f21e 100644 --- a/src/ServiceBus.Tests/FunctionEndpointComponent.cs +++ b/src/ServiceBus.Tests/FunctionEndpointComponent.cs @@ -2,20 +2,14 @@ { using System; using System.Collections.Generic; - using System.IO; using System.Linq; - using System.Reflection; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.ServiceBus; using Microsoft.Extensions.DependencyInjection; using NServiceBus; using NServiceBus.AcceptanceTesting; using NServiceBus.AcceptanceTesting.Customization; using NServiceBus.AcceptanceTesting.Support; - using NServiceBus.MessageInterfaces.MessageMapper.Reflection; - using NServiceBus.Serialization; - using NServiceBus.Settings; using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext; abstract class FunctionEndpointComponent : IComponentBehavior @@ -57,9 +51,6 @@ public FunctionRunner( this.scenarioContext = scenarioContext; this.functionComponentType = functionComponentType; Name = functionComponentType.FullName; - - var serializer = new NewtonsoftSerializer(); - messageSerializer = serializer.Configure(new SettingsHolder())(new MessageMapper()); } public override string Name { get; } @@ -107,9 +98,9 @@ public override async Task ComponentsStarted(CancellationToken token) { foreach (var message in messages) { - var transportMessage = GenerateMessage(message); + var transportMessage = MessageHelper.GenerateMessage(message); var context = new ExecutionContext(); - await endpoint.Process(transportMessage, context); + await endpoint.ProcessNonTransactional(transportMessage, context, null); } } @@ -123,37 +114,11 @@ public override Task Stop() return base.Stop(); } - Message GenerateMessage(object message) - { - Message asbMessage; - using (var stream = new MemoryStream()) - { - messageSerializer.Serialize(message, stream); - asbMessage = new Message(stream.ToArray()); - } - - asbMessage.UserProperties["NServiceBus.EnclosedMessageTypes"] = message.GetType().FullName; - - var systemProperties = new Message.SystemPropertiesCollection(); - // sequence number is required to prevent SystemPropertiesCollection from throwing on the getters - var fieldInfo = typeof(Message.SystemPropertiesCollection).GetField("sequenceNumber", BindingFlags.NonPublic | BindingFlags.Instance); - fieldInfo.SetValue(systemProperties, 123); - // set delivery count to 1 - var deliveryCountProperty = typeof(Message.SystemPropertiesCollection).GetProperty("DeliveryCount"); - deliveryCountProperty.SetValue(systemProperties, 1); - // assign test message mocked system properties - var property = typeof(Message).GetProperty("SystemProperties"); - property.SetValue(asbMessage, systemProperties); - - return asbMessage; - } - readonly Action configurationCustomization; readonly ScenarioContext scenarioContext; readonly Type functionComponentType; IList messages; FunctionEndpoint endpoint; - IMessageSerializer messageSerializer; } } } \ No newline at end of file diff --git a/src/ServiceBus.Tests/FunctionEndpointTests.cs b/src/ServiceBus.Tests/FunctionEndpointTests.cs new file mode 100644 index 00000000..9cfff3e0 --- /dev/null +++ b/src/ServiceBus.Tests/FunctionEndpointTests.cs @@ -0,0 +1,172 @@ +namespace ServiceBus.Tests +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; + using System.Transactions; + using NServiceBus; + using NServiceBus.AzureFunctions.InProcess.ServiceBus; + using NServiceBus.Transport; + using NUnit.Framework; + + [TestFixture] + public class FunctionEndpointTests + { + [Test] + public async Task When_processing_successful_should_complete_message() + { + var message = MessageHelper.GenerateMessage(new TestMessage()); + MessageContext messageContext = null; + var pipelineInvoker = await CreatePipeline( + ctx => + { + messageContext = ctx; + return Task.CompletedTask; + }); + + var transactionStrategy = new TestableFunctionTransactionStrategy(); + + await FunctionEndpoint.Process( + message, + transactionStrategy, + pipelineInvoker); + + Assert.IsTrue(transactionStrategy.OnCompleteCalled); + Assert.AreSame(message.GetMessageId(), messageContext.MessageId); + Assert.AreSame(message.Body, messageContext.Body); + CollectionAssert.IsSubsetOf(message.GetHeaders(), messageContext.Headers); // the IncomingMessage has an additional MessageId header + Assert.AreEqual(1, transactionStrategy.CreatedTransportTransactions.Count); + Assert.AreSame(transactionStrategy.CreatedTransportTransactions[0], messageContext.TransportTransaction); + } + + [Test] + public async Task When_processing_fails_should_provide_error_context() + { + var message = MessageHelper.GenerateMessage(new TestMessage()); + var pipelineException = new Exception("test exception"); + ErrorContext errorContext = null; + var pipelineInvoker = await CreatePipeline( + _ => throw pipelineException, + errCtx => + { + errorContext = errCtx; + return Task.FromResult(ErrorHandleResult.Handled); + }); + + var transactionStrategy = new TestableFunctionTransactionStrategy(); + + await FunctionEndpoint.Process( + message, + transactionStrategy, + pipelineInvoker); + + Assert.AreSame(pipelineException, errorContext.Exception); + Assert.AreSame(message.GetMessageId(), errorContext.Message.NativeMessageId); + Assert.AreSame(message.Body, errorContext.Message.Body); + CollectionAssert.IsSubsetOf(message.GetHeaders(), errorContext.Message.Headers); // the IncomingMessage has an additional MessageId header + Assert.AreSame(transactionStrategy.CreatedTransportTransactions.Last(), errorContext.TransportTransaction); // verify usage of the correct transport transaction instance + Assert.AreEqual(2, transactionStrategy.CreatedTransportTransactions.Count); // verify that a new transport transaction has been created for the error handling + } + + [Test] + public async Task When_error_pipeline_fails_should_throw() + { + var errorPipelineException = new Exception("error pipeline failure"); + var pipelineInvoker = await CreatePipeline( + _ => throw new Exception("main pipeline failure"), + _ => throw errorPipelineException); + + var transactionStrategy = new TestableFunctionTransactionStrategy(); + + var exception = Assert.ThrowsAsync(async () => + await FunctionEndpoint.Process( + MessageHelper.GenerateMessage(new TestMessage()), + transactionStrategy, + pipelineInvoker)); + + Assert.IsFalse(transactionStrategy.OnCompleteCalled); + Assert.AreSame(errorPipelineException, exception); + } + + [Test] + public async Task When_error_pipeline_handles_error_should_complete_message() + { + var pipelineInvoker = await CreatePipeline( + _ => throw new Exception("main pipeline failure"), + _ => Task.FromResult(ErrorHandleResult.Handled)); + + var transactionStrategy = new TestableFunctionTransactionStrategy(); + + await FunctionEndpoint.Process( + MessageHelper.GenerateMessage(new TestMessage()), + transactionStrategy, + pipelineInvoker); + + Assert.IsTrue(transactionStrategy.OnCompleteCalled); + } + + [Test] + public async Task When_error_pipeline_requires_retry_should_throw() + { + var mainPipelineException = new Exception("main pipeline failure"); + var pipelineInvoker = await CreatePipeline( + _ => throw mainPipelineException, + _ => Task.FromResult(ErrorHandleResult.RetryRequired)); + + var transactionStrategy = new TestableFunctionTransactionStrategy(); + + var exception = Assert.ThrowsAsync(async () => + await FunctionEndpoint.Process( + MessageHelper.GenerateMessage(new TestMessage()), + transactionStrategy, + pipelineInvoker)); + + Assert.IsFalse(transactionStrategy.OnCompleteCalled); + Assert.AreSame(mainPipelineException, exception); + } + + static async Task CreatePipeline(Func mainPipeline = null, Func> errorPipeline = null) + { + var pipelineInvoker = new PipelineInvoker(); + await (pipelineInvoker as IPushMessages) + .Init( + mainPipeline ?? (_ => Task.CompletedTask), + errorPipeline ?? (_ => Task.FromResult(ErrorHandleResult.Handled)), + null, null); + return pipelineInvoker; + } + + class TestMessage + { + } + + class TestableFunctionTransactionStrategy : ITransactionStrategy + { + public bool OnCompleteCalled { get; private set; } + public CommittableTransaction CompletedTransaction { get; private set; } + public CommittableTransaction CreatedTransaction { get; private set; } + public List CreatedTransportTransactions { get; } = new List(); + + public Task Complete(CommittableTransaction transaction) + { + OnCompleteCalled = true; + CompletedTransaction = transaction; + return Task.CompletedTask; + } + + public CommittableTransaction CreateTransaction() + { + CreatedTransaction = new CommittableTransaction(); + return CreatedTransaction; + } + + public TransportTransaction CreateTransportTransaction(CommittableTransaction transaction) + { + var transportTransaction = new TransportTransaction(); + CreatedTransportTransactions.Add(transportTransaction); + return transportTransaction; + } + } + } +} \ No newline at end of file diff --git a/src/ServiceBus.Tests/MessageHelper.cs b/src/ServiceBus.Tests/MessageHelper.cs new file mode 100644 index 00000000..8416a293 --- /dev/null +++ b/src/ServiceBus.Tests/MessageHelper.cs @@ -0,0 +1,43 @@ +namespace ServiceBus.Tests +{ + using System; + using System.IO; + using System.Reflection; + using Microsoft.Azure.ServiceBus; + using NServiceBus; + using NServiceBus.MessageInterfaces.MessageMapper.Reflection; + using NServiceBus.Serialization; + using NServiceBus.Settings; + + public class MessageHelper + { + static NewtonsoftSerializer serializer = new NewtonsoftSerializer(); + static IMessageSerializer messageSerializer = serializer.Configure(new SettingsHolder())(new MessageMapper()); + + public static Message GenerateMessage(object message) + { + Message asbMessage; + using (var stream = new MemoryStream()) + { + messageSerializer.Serialize(message, stream); + asbMessage = new Message(stream.ToArray()); + } + + asbMessage.UserProperties["NServiceBus.EnclosedMessageTypes"] = message.GetType().FullName; + + var systemProperties = new Message.SystemPropertiesCollection(); + // sequence number is required to prevent SystemPropertiesCollection from throwing on the getters + var fieldInfo = typeof(Message.SystemPropertiesCollection).GetField("sequenceNumber", BindingFlags.NonPublic | BindingFlags.Instance); + fieldInfo.SetValue(systemProperties, 123); + // set delivery count to 1 + var deliveryCountProperty = typeof(Message.SystemPropertiesCollection).GetProperty("DeliveryCount"); + deliveryCountProperty.SetValue(systemProperties, 1); + // assign test message mocked system properties + var property = typeof(Message).GetProperty("SystemProperties"); + property.SetValue(asbMessage, systemProperties); + + asbMessage.MessageId = Guid.NewGuid().ToString("N"); + return asbMessage; + } + } +} \ No newline at end of file diff --git a/src/ServiceBus.Tests/ReflectionHelperTests.cs b/src/ServiceBus.Tests/ReflectionHelperTests.cs new file mode 100644 index 00000000..bea7620f --- /dev/null +++ b/src/ServiceBus.Tests/ReflectionHelperTests.cs @@ -0,0 +1,148 @@ +namespace ServiceBus.Tests +{ + using System; + using Microsoft.Azure.ServiceBus; + using Microsoft.Azure.WebJobs; + using NServiceBus.AzureFunctions.InProcess.ServiceBus; + using NUnit.Framework; + + [TestFixture] + public class ReflectionHelperTests + { + [Test] + public void When_no_attributes_defined_should_throw() + { + var exception = Assert.Throws(() => ReflectionHelper.GetAutoCompleteValue()); + + StringAssert.Contains($"Could not locate {nameof(ServiceBusTriggerAttribute)} to infer the AutoComplete setting.", exception.Message); + } + + [Test] + public void When_no_function_name_attribute_defined_should_throw() + { + var exception = Assert.Throws(() => FunctionWithNoFunctionNameAttribute(null)); + + StringAssert.Contains($"Could not locate {nameof(ServiceBusTriggerAttribute)} to infer the AutoComplete setting.", exception.Message); + + void FunctionWithNoFunctionNameAttribute( + [ServiceBusTrigger("queueName", "subscriptionname", AutoComplete = true)] Message _) + { + ReflectionHelper.GetAutoCompleteValue(); + } + } + + [Test] + public void When_no_trigger_attribute_defined_should_throw() + { + var exception = Assert.Throws(() => FunctionWithNoServiceBusTriggerAttribute(null)); + + StringAssert.Contains($"Could not locate {nameof(ServiceBusTriggerAttribute)} to infer the AutoComplete setting.", exception.Message); + + [FunctionName("TestFunction")] + void FunctionWithNoServiceBusTriggerAttribute( + Message _) + { + ReflectionHelper.GetAutoCompleteValue(); + } + } + + [Test] + public void When_auto_complete_set_to_false_should_return_false() + { + FunctionTriggerWithAutoCompleteExplicitlySetToFalse(null); + + [FunctionName("TestFunction")] + void FunctionTriggerWithAutoCompleteExplicitlySetToFalse( + [ServiceBusTrigger("queueName", "subscriptionname", AutoComplete = false)] Message _) + { + Assert.IsFalse(ReflectionHelper.GetAutoCompleteValue()); + } + } + + [Test] + public void When_auto_complete_set_to_true_should_return_true() + { + FunctionTriggerWithAutoCompleteExplicitlySetToTrue(null); + + [FunctionName("TestFunction")] + void FunctionTriggerWithAutoCompleteExplicitlySetToTrue( + [ServiceBusTrigger("queueName", "subscriptionname", AutoComplete = true)] Message _) + { + Assert.IsTrue(ReflectionHelper.GetAutoCompleteValue()); + } + } + + [Test] + public void When_auto_complete_not_set_should_return_true() + { + FunctionTriggerWithoutAutoCompleteConfiguration(null); + + [FunctionName("TestFunction")] + void FunctionTriggerWithoutAutoCompleteConfiguration( + [ServiceBusTrigger("queueName", "subscriptionname")] Message _) + { + Assert.True(ReflectionHelper.GetAutoCompleteValue()); + } + } + + [Test] + public void When_helper_invoked_in_nested_methods() + { + NestedTrigger(null); + + [FunctionName("TestFunction")] + void NestedTrigger( + [ServiceBusTrigger("queueName", "subscriptionname", AutoComplete = false)] Message _) + { + One(); + } + + void One() + { + Two(); + } + + void Two() + { + Three(); + } + + void Three() + { + Assert.IsFalse(ReflectionHelper.GetAutoCompleteValue()); + } + } + + [Test] + public void When_helper_invoked_in_local_function() + { + LocalFunction(null); + + [FunctionName("TestFunction")] + void LocalFunction( + [ServiceBusTrigger("queueName", "subscriptionname", AutoComplete = false)] Message _) + { + LocalFunction(); + + void LocalFunction() + { + Assert.IsFalse(ReflectionHelper.GetAutoCompleteValue()); + } + } + } + + [Test] + public void When_helper_invoked_in_expression() + { + Expression(null); + + [FunctionName("TestFunction")] + void Expression( + [ServiceBusTrigger("queueName", "subscriptionname", AutoComplete = false)] Message _) + { + Func expression = () => ReflectionHelper.GetAutoCompleteValue(); + Assert.IsFalse(expression()); + } + } + } +} \ No newline at end of file diff --git a/src/ServiceBus.Tests/ServiceBus.Tests.csproj b/src/ServiceBus.Tests/ServiceBus.Tests.csproj index 22cc3d0f..dfe67d94 100644 --- a/src/ServiceBus.Tests/ServiceBus.Tests.csproj +++ b/src/ServiceBus.Tests/ServiceBus.Tests.csproj @@ -4,6 +4,7 @@ netcoreapp3.1;net5.0 true ..\Test.snk + 9 diff --git a/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs b/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs index da2e6ec5..23dd7501 100644 --- a/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs +++ b/src/ServiceBus.Tests/When_shipping_handlers_in_dedicated_assembly.cs @@ -42,7 +42,7 @@ public async Task Should_load_handlers_from_assembly_when_using_FunctionsHostBui // we need to process an actual message to have the endpoint being created - await endpoint.Process(GenerateMessage(), new ExecutionContext()); + await endpoint.ProcessNonTransactional(GenerateMessage(), new ExecutionContext(), null); // The message handler assembly should be loaded now because scanning should find and load the handler assembly Assert.True(AppDomain.CurrentDomain.GetAssemblies().Any(a => a.FullName == "Testing.Handlers, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"));