Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
27fa1db
first draft of transaction support
timbussmann Jul 27, 2021
f848ef7
add missing logging integration
timbussmann Jul 28, 2021
440e356
tweaks
timbussmann Jul 28, 2021
12a57dc
Add support for cross-entity transactions configuration via attribute
SeanFeldman Jul 29, 2021
69dc522
Approve API
SeanFeldman Jul 29, 2021
31a8f1b
switch invoked method based on transaction setting
timbussmann Jul 29, 2021
1873644
please code style
timbussmann Jul 29, 2021
efcc044
make the generated code valid syntax
timbussmann Jul 29, 2021
8849290
update approval files
timbussmann Jul 29, 2021
e944564
verify generated trigger compiles without errors
timbussmann Jul 29, 2021
3cd1b6c
remove optional ctor parameter for public attribute properties
timbussmann Jul 29, 2021
9ecd740
update tests
timbussmann Jul 29, 2021
3124cc0
cleanup tests
timbussmann Jul 29, 2021
4ecebe1
obsolete NServiceBusEndpointNameAttribute
timbussmann Jul 29, 2021
3c64b9a
suppress CS0618
timbussmann Jul 29, 2021
431bac2
Merge pull request #240 from Particular/attribute-support-for-transac…
SeanFeldman Jul 29, 2021
cd5171d
use single process implementation
timbussmann Aug 2, 2021
ae0b19f
delete comment
timbussmann Aug 2, 2021
566b21e
Native message may not have a messageId
mikeminutillo Aug 3, 2021
2c0d749
Extract transaction strategy
mikeminutillo Aug 3, 2021
b6c8700
use single process implementation
timbussmann Aug 2, 2021
2def400
delete comment
timbussmann Aug 2, 2021
8ec81e7
Native message may not have a messageId
mikeminutillo Aug 3, 2021
d47414e
add some unit tests around the Process implementation
timbussmann Aug 3, 2021
27a7186
Extract transaction strategy
mikeminutillo Aug 3, 2021
2f9e9db
Fix tests
mikeminutillo Aug 6, 2021
e1888ed
Merge branch 'extract-transaction-strategy' of https://github.com/Par…
mikeminutillo Aug 6, 2021
d42d229
Refactor
mikeminutillo Aug 6, 2021
f74c8dc
Merge pull request #243 from Particular/extract-transaction-strategy
mikeminutillo Aug 6, 2021
aeb893f
Auto-detect AutoComplete setting (#244)
timbussmann Aug 6, 2021
0a5895f
Update src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBu…
timbussmann Aug 9, 2021
64e5453
use explicit interface impl. to hide Process on FunctionEndpoint
timbussmann Aug 9, 2021
48aa83d
inline SafeCompleteAsync
timbussmann Aug 9, 2021
942c089
update attribute property name
timbussmann Aug 9, 2021
f058b33
use existing function name
timbussmann Aug 9, 2021
df97eed
approve API changes
timbussmann Aug 9, 2021
512fce2
Merge remote-tracking branch 'origin/master' into transactions
timbussmann Aug 9, 2021
cf58c4c
prevent potential NRE
timbussmann Aug 9, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<Weavers GenerateXsd="false">
<Obsolete />
</Weavers>
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
using Extensibility;
using Logging;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Transport;
using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext;
Expand All @@ -28,54 +30,118 @@ internal FunctionEndpoint(IStartableEndpointWithExternallyManagedContainer exter
endpointFactory = _ => externallyManagedContainerEndpoint.Start(serviceProvider);
}

/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. This method will lookup the <see cref="ServiceBusTriggerAttribute.AutoComplete"/> setting to determine whether to use transactional or non-transactional processing.
/// </summary>
Task IFunctionEndpoint.Process(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger) =>
ReflectionHelper.GetAutoCompleteValue()
? ProcessNonTransactional(message, executionContext, messageReceiver, functionsLogger)
: ProcessTransactional(message, executionContext, messageReceiver, functionsLogger);

/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. All messages are committed transactionally with the successful processing of the incoming message.
/// <remarks>Requires <see cref="ServiceBusTriggerAttribute.AutoComplete"/> to be set to false!</remarks>
/// </summary>
public async Task ProcessTransactional(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger = null)
{
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger);

try
{
await InitializeEndpointIfNecessary(functionExecutionContext, CancellationToken.None)
.ConfigureAwait(false);

await Process(message, new MessageReceiverTransactionStrategy(message, messageReceiver), pipeline)
.ConfigureAwait(false);
}
catch (Exception)
{
// abandon message outside of a transaction scope to ensure the abandon operation can't be rolled back
await messageReceiver.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
throw;
}
}

/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline.
/// </summary>
public Task ProcessNonTransactional(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger = null) => Process(message, executionContext, functionsLogger);

/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline.
/// </summary>
[ObsoleteEx(
ReplacementTypeOrMember = "Process(Message, ExecutionContext, IMessageReceiver, ILogger)",
TreatAsErrorFromVersion = "2",
RemoveInVersion = "3")]
public async Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null)
{
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

var messageContext = CreateMessageContext(message);
var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger);

await InitializeEndpointIfNecessary(functionExecutionContext,
messageContext.ReceiveCancellationTokenSource.Token).ConfigureAwait(false);
await InitializeEndpointIfNecessary(functionExecutionContext, CancellationToken.None)
.ConfigureAwait(false);

await Process(message, NoTransactionStrategy.Instance, pipeline)
.ConfigureAwait(false);
}

internal static async Task Process(Message message, ITransactionStrategy transactionStrategy, PipelineInvoker pipeline)
{
var messageId = message.GetMessageId();

try
{
await pipeline.PushMessage(messageContext).ConfigureAwait(false);
using (var transaction = transactionStrategy.CreateTransaction())
{
var transportTransaction = transactionStrategy.CreateTransportTransaction(transaction);
var messageContext = CreateMessageContext(transportTransaction);

await pipeline.PushMessage(messageContext).ConfigureAwait(false);

await transactionStrategy.Complete(transaction).ConfigureAwait(false);

transaction?.Commit();
}
}
catch (Exception exception)
{
var errorContext = new ErrorContext(
exception,
message.GetHeaders(),
messageContext.MessageId,
messageContext.Body,
new TransportTransaction(),
message.SystemProperties.DeliveryCount);
using (var transaction = transactionStrategy.CreateTransaction())
{
var transportTransaction = transactionStrategy.CreateTransportTransaction(transaction);
var errorContext = new ErrorContext(
exception,
message.GetHeaders(),
messageId,
message.Body,
transportTransaction,
message.SystemProperties.DeliveryCount);

var errorHandleResult = await pipeline.PushFailedMessage(errorContext).ConfigureAwait(false);

if (errorHandleResult == ErrorHandleResult.Handled)
{
await transactionStrategy.Complete(transaction).ConfigureAwait(false);

var errorHandleResult = await pipeline.PushFailedMessage(errorContext).ConfigureAwait(false);
transaction?.Commit();
return;
}

if (errorHandleResult == ErrorHandleResult.Handled)
{
// return to signal to the Functions host it can complete the incoming message
return;
throw;
}

throw;
}

MessageContext CreateMessageContext(Message originalMessage)
{
return new MessageContext(
originalMessage.GetMessageId(),
originalMessage.GetHeaders(),
originalMessage.Body,
new TransportTransaction(),
MessageContext CreateMessageContext(TransportTransaction transportTransaction) =>
new MessageContext(
messageId,
message.GetHeaders(),
message.Body,
transportTransaction,
new CancellationTokenSource(),
new ContextBag());
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

Expand All @@ -12,9 +13,18 @@
/// </summary>
public interface IFunctionEndpoint
{
/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. This method will lookup the <see cref="ServiceBusTriggerAttribute.AutoComplete"/> setting to determine whether to use transactional or non-transactional processing.
/// </summary>
Task Process(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger = null);

/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline.
/// </summary>
[ObsoleteEx(
ReplacementTypeOrMember = "Process(Message, ExecutionContext, IMessageReceiver, ILogger)",
TreatAsErrorFromVersion = "2",
RemoveInVersion = "3")]
Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="[2.3.0, 3.0.0)" />
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="[1.9.0, 2.0.0)" />
<PackageReference Include="NServiceBus.Extensions.DependencyInjection" Version="[1.0.1, 2.0.0)" />
<PackageReference Include="Obsolete.Fody" Version="5.2.1" PrivateAssets="All"/>
<PackageReference Include="Particular.Packaging" Version="1.2.1" PrivateAssets="All" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
/// Assembly attribute to specify NServiceBus logical endpoint name.
/// This name is used to wire up an auto-generated service bus trigger function, responding to messages in the queue specified by the name provided.
///</summary>
[ObsoleteEx(
ReplacementTypeOrMember = nameof(NServiceBusTriggerFunctionAttribute),
TreatAsErrorFromVersion = "2",
RemoveInVersion = "3")]
[System.AttributeUsage(System.AttributeTargets.Assembly)]
public sealed class NServiceBusEndpointNameAttribute : System.Attribute
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
namespace NServiceBus
{
///<summary>
/// Assembly attribute to configure generated NServiceBus Azure Function.
/// The attribute is used to wire up an auto-generated Service Bus trigger function, responding to messages in the queue specified by the name provided.
///</summary>
[System.AttributeUsage(System.AttributeTargets.Assembly)]
public sealed class NServiceBusTriggerFunctionAttribute : System.Attribute
{
/// <summary>
/// Endpoint name that is the input queue name.
/// </summary>
public string EndpointName { get; }

/// <summary>
/// Override trigger function name.
/// </summary>
public string TriggerFunctionName { get; set; }

/// <summary>
/// Enable cross-entity transactions.
/// </summary>
public bool EnableCrossEntityTransactions { get; set; }

/// <summary>
/// Endpoint logical name.
/// </summary>
/// <param name="endpointName">Endpoint name that is the input queue name.</param>
public NServiceBusTriggerFunctionAttribute(string endpointName)
{
EndpointName = endpointName;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
namespace NServiceBus.AzureFunctions.InProcess.ServiceBus
{
using System;
using System.Diagnostics;
using System.Reflection;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.WebJobs;

class ReflectionHelper
{
public static bool GetAutoCompleteValue()
{
var triggerAttribute = FindTriggerAttributeInternal();
if (triggerAttribute != null)
{
return triggerAttribute.AutoComplete;
}

throw new Exception($"Could not locate {nameof(ServiceBusTriggerAttribute)} to infer the AutoComplete setting. Make sure that the function trigger contains a parameter decorated with {nameof(ServiceBusTriggerAttribute)} or use the advanced APIs exposed via the {nameof(FunctionEndpoint)} type instead.");
}

public static ServiceBusTriggerAttribute FindBusTriggerAttribute() => FindTriggerAttributeInternal();

static ServiceBusTriggerAttribute FindTriggerAttributeInternal()
{
var st = new StackTrace(skipFrames: 2); // skip first two frames because it is this method + the public method
var frames = st.GetFrames();
foreach (var frame in frames)
{
var method = frame?.GetMethod();
if (method?.GetCustomAttribute<FunctionNameAttribute>(false) != null)
{
foreach (var parameter in method.GetParameters())
{
ServiceBusTriggerAttribute serviceBusTriggerAttribute;
if (parameter.ParameterType == typeof(Message)
&& (serviceBusTriggerAttribute = parameter.GetCustomAttribute<ServiceBusTriggerAttribute>(false)) != null)
{
return serviceBusTriggerAttribute;
}
}

return null;
}
}

return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Threading.Tasks;
using AzureFunctions.InProcess.ServiceBus;
using Logging;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
using Serialization;
using Transport;
Expand Down Expand Up @@ -134,10 +133,10 @@ static string GetConfiguredValueOrFallback(IConfiguration configuration, string
/// </summary>
public static ServiceBusTriggeredEndpointConfiguration FromAttributes()
{
var configuration = TriggerDiscoverer.TryGet<ServiceBusTriggerAttribute>();
if (configuration != null)
var serviceBusTriggerAttribute = ReflectionHelper.FindBusTriggerAttribute();
if (serviceBusTriggerAttribute != null)
{
return new ServiceBusTriggeredEndpointConfiguration(configuration.QueueName, configuration.Connection);
return new ServiceBusTriggeredEndpointConfiguration(serviceBusTriggerAttribute.QueueName, serviceBusTriggerAttribute.Connection);
}

throw new Exception(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace NServiceBus
{
using System.Threading.Tasks;
using System.Transactions;
using Transport;

interface ITransactionStrategy
{
CommittableTransaction CreateTransaction();
TransportTransaction CreateTransportTransaction(CommittableTransaction transaction);
Task Complete(CommittableTransaction transaction);
}
}
Loading