Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using System;
using System.Diagnostics;
Expand Down
150 changes: 123 additions & 27 deletions src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
namespace NServiceBus
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using AzureFunctions;
using System.Transactions;
using AzureFunctions.ServiceBus;
using Extensibility;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Extensions.Logging;
using Transport;
using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext;
Expand All @@ -27,49 +29,143 @@ public FunctionEndpoint(Func<FunctionExecutionContext, ServiceBusTriggeredEndpoi
/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline.
/// </summary>
public async Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null)
public async Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null, IMessageReceiver messageReceiver = null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider creating a dedicated overload where the message receiver is not optional (and maybe even name it differently)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What benefits will it provide?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better indication to the user what it does to provide this parameter

{
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

var messageContext = CreateMessageContext(message);
// When the message receiver is provided, we assume TransportTransactionMode.SendsAtomicWithReceive to be used
var useTransaction = messageReceiver != null;
var transportTransactionMode = useTransaction ? TransportTransactionMode.SendsAtomicWithReceive : TransportTransactionMode.ReceiveOnly;

var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger);

var lockToken = message.SystemProperties.LockToken;
string messageId;
Dictionary<string, string> headers;
byte[] body;

try
{
await Process(messageContext, functionExecutionContext).ConfigureAwait(false);
messageId = message.GetMessageId();
headers = message.GetHeaders();
body = message.GetBody();
}
catch (Exception exception)
{
var errorContext = new ErrorContext(
exception,
message.GetHeaders(),
messageContext.MessageId,
messageContext.Body,
new TransportTransaction(),
message.SystemProperties.DeliveryCount);

var errorHandleResult = await ProcessFailedMessage(errorContext, functionExecutionContext)
.ConfigureAwait(false);

if (errorHandleResult == ErrorHandleResult.Handled)
try
{
await messageReceiver.SafeDeadLetterAsync(transportTransactionMode, lockToken, deadLetterReason: "Poisoned message", deadLetterErrorDescription: exception.Message).ConfigureAwait(false);
}
catch (Exception ex)
{
// nothing we can do about it, message will be retried
functionExecutionContext.Logger.LogDebug($"Failed to move a poisonous message with native ID: `{message.MessageId}` to the dead-letter queue. Message will be retried.", ex);
}

return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we would rethrow here instead, what would happen given that we moved the message to the DLQ already?

}

try
{
var transportTransaction = CreateTransportTransaction(useTransaction, messageReceiver, message.PartitionKey);
var messageContext = CreateMessageContext(message, messageId, headers, body, transportTransaction);

using (var scope = useTransaction ? new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled) : null)
{
await Process(messageContext, functionExecutionContext).ConfigureAwait(false);

// Azure Functions auto-completion would be disabled if we try to run in SendsAtomicWithReceive, need to complete message manually
if (useTransaction)
{
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
}

scope?.Complete();
}
}
catch (Exception exception)
{
try
{
ErrorHandleResult result;

using (var scope = useTransaction ? new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled) : null)
{
var transportTransaction = CreateTransportTransaction(useTransaction, messageReceiver, message.PartitionKey);
// provide an unmodified copy of the headers
var errorContext = CreateErrorContext(exception, messageId, message.GetHeaders(), body, transportTransaction, message.SystemProperties.DeliveryCount);

result = await ProcessFailedMessage(errorContext, functionExecutionContext).ConfigureAwait(false);

if (result == ErrorHandleResult.Handled)
{
await messageReceiver.SafeCompleteAsync(transportTransactionMode, lockToken).ConfigureAwait(false);
}

scope?.Complete();
}

if (result == ErrorHandleResult.RetryRequired)
{
await messageReceiver.SafeAbandonAsync(transportTransactionMode, lockToken).ConfigureAwait(false);

// Indicate to the Functions runtime not to complete the incoming message
throw;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part should probably not be included in the try/catch because it would be just rethrown again on line 126 with an incorrect log message?

}
}
catch (Exception onErrorException) when (onErrorException is MessageLockLostException || onErrorException is ServiceBusTimeoutException)
{
// return to signal to the Functions host it can complete the incoming message
return;
functionExecutionContext.Logger.LogDebug("Failed to execute recoverability.", onErrorException);

throw;
}
catch (Exception onErrorException)
{
functionExecutionContext.Logger.LogCritical($"Failed to execute recoverability policy for message with native ID: `{message.MessageId}`", onErrorException);

await messageReceiver.SafeAbandonAsync(transportTransactionMode, lockToken).ConfigureAwait(false);

throw;
throw;
}
}
}

MessageContext CreateMessageContext(Message originalMessage)
static MessageContext CreateMessageContext(Message originalMessage, string messageId, Dictionary<string, string> headers, byte[] body, TransportTransaction transportTransaction)
{
var contextBag = new ContextBag();
contextBag.Set(originalMessage);

return new MessageContext(
messageId,
headers,
body,
transportTransaction,
new CancellationTokenSource(),
contextBag);
}

static ErrorContext CreateErrorContext(Exception exception, string messageId, Dictionary<string, string> headers, byte[] body, TransportTransaction transportTransaction, int immediateProcessingFailures)
{
return new ErrorContext(
exception,
headers,
messageId,
body,
transportTransaction,
immediateProcessingFailures);
}

static TransportTransaction CreateTransportTransaction(bool useTransaction, IClientEntity messageReceiver, string incomingQueuePartitionKey)
{
var transportTransaction = new TransportTransaction();

if (useTransaction)
{
return new MessageContext(
originalMessage.GetMessageId(),
originalMessage.GetHeaders(),
originalMessage.Body,
new TransportTransaction(),
new CancellationTokenSource(),
new ContextBag());
transportTransaction.Set((messageReceiver.ServiceBusConnection, messageReceiver.Path));
transportTransaction.Set("IncomingQueue.PartitionKey", incomingQueuePartitionKey);
}

return transportTransaction;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using System;
using System.Threading;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using System;
using System.Threading;
Expand Down
11 changes: 11 additions & 0 deletions src/NServiceBus.AzureFunctions.ServiceBus/MessageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.InteropExtensions;

static class MessageExtensions
{
Expand Down Expand Up @@ -40,5 +41,15 @@ public static string GetMessageId(this Message message)

return message.MessageId;
}

public static byte[] GetBody(this Message message)
{
if (message.UserProperties.TryGetValue("NServiceBus.Transport.Encoding", out var value) && value.Equals("wcf/byte-array"))
{
return message.GetBody<byte[]>() ?? Array.Empty<byte>();
}

return message.Body ?? Array.Empty<byte>();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace NServiceBus.AzureFunctions.ServiceBus
{
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus.Core;

static class MessageReceiverExtensions
{
public static Task SafeCompleteAsync(this IMessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken)
{
if (transportTransactionMode != TransportTransactionMode.None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that will never be the case because we only switch between SendOnly and SendsAtomicWithReceive?

{
return messageReceiver.CompleteAsync(lockToken);
Copy link
Contributor

@timbussmann timbussmann Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't that currently throw NREs when not providing a message receiver? (same issue on all the extension methods here)

}

return Task.CompletedTask;
}

public static Task SafeAbandonAsync(this IMessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken)
{
if (transportTransactionMode != TransportTransactionMode.None)
{
return messageReceiver.AbandonAsync(lockToken);
}

return Task.CompletedTask;
}

public static Task SafeDeadLetterAsync(this IMessageReceiver messageReceiver, TransportTransactionMode transportTransactionMode, string lockToken, string deadLetterReason, string deadLetterErrorDescription)
{
if (transportTransactionMode != TransportTransactionMode.None)
{
return messageReceiver.DeadLetterAsync(lockToken, deadLetterReason, deadLetterErrorDescription);
}

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using Configuration.AdvancedExtensibility;
using Transport;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using System;
using Transport;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using System;
using System.IO;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using System;
using System.Security.Cryptography;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using System.Threading.Tasks;
using Transport;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using System.Threading.Tasks;
using Transport;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using System;
using System.Threading.Tasks;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using Settings;
using Transport;
Expand All @@ -18,7 +18,8 @@ public ServerlessTransport()
public override TransportInfrastructure Initialize(SettingsHolder settings, string connectionString)
{
var baseTransportInfrastructure = baseTransport.Initialize(settings, connectionString);
return new ServerlessTransportInfrastructure<TBaseTransport>(baseTransportInfrastructure, settings);

return new ServerlessTransportInfrastructure(baseTransportInfrastructure, settings);
}

readonly TBaseTransport baseTransport;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
namespace NServiceBus.AzureFunctions
namespace NServiceBus.AzureFunctions.ServiceBus
{
using System;
using System.Collections.Generic;
using Routing;
using Settings;
using Transport;

class ServerlessTransportInfrastructure<TBaseInfrastructure> : TransportInfrastructure
class ServerlessTransportInfrastructure : TransportInfrastructure
{
public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportInfrastructure,
SettingsHolder settings)
public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportInfrastructure, SettingsHolder settings)
{
this.baseTransportInfrastructure = baseTransportInfrastructure;
this.settings = settings;
}

public override IEnumerable<Type> DeliveryConstraints =>
baseTransportInfrastructure.DeliveryConstraints;
public override IEnumerable<Type> DeliveryConstraints => baseTransportInfrastructure.DeliveryConstraints;

//support ReceiveOnly so that we can use immediate retries
public override TransportTransactionMode TransactionMode { get; } = TransportTransactionMode.ReceiveOnly;
public override TransportTransactionMode TransactionMode { get; } = TransportTransactionMode.SendsAtomicWithReceive;

public override OutboundRoutingPolicy OutboundRoutingPolicy =>
baseTransportInfrastructure.OutboundRoutingPolicy;
public override OutboundRoutingPolicy OutboundRoutingPolicy => baseTransportInfrastructure.OutboundRoutingPolicy;

public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
{
var pipelineInvoker = settings.GetOrCreate<PipelineInvoker>();

return new ManualPipelineInvocationInfrastructure(pipelineInvoker);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using Logging;
using Microsoft.Azure.WebJobs;
using System;
using AzureFunctions;
using AzureFunctions.ServiceBus;

/// <summary>
/// Represents a serverless NServiceBus endpoint running within an AzureServiceBus trigger.
Expand Down
Loading