Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Logging;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Transport;
using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext;
Expand All @@ -29,8 +30,17 @@ internal FunctionEndpoint(IStartableEndpointWithExternallyManagedContainer exter
endpointFactory = _ => externallyManagedContainerEndpoint.Start(serviceProvider);
}

/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. This method will lookup the <see cref="ServiceBusTriggerAttribute.AutoComplete"/> setting to determine whether to use transactional or non-transactional processing.
/// </summary>
public Task Process(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger = null) =>
ReflectionHelper.GetAutoCompleteValue()
? ProcessNonTransactional(message, executionContext, messageReceiver, functionsLogger)
: ProcessTransactional(message, executionContext, messageReceiver, functionsLogger);

/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. All messages are committed transactionally with the successful processing of the incoming message.
/// <remarks>Requires <see cref="ServiceBusTriggerAttribute.AutoComplete"/> to be set to false!</remarks>
/// </summary>
public async Task ProcessTransactional(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger = null)
{
Expand All @@ -57,6 +67,15 @@ await InitializeEndpointIfNecessary(functionExecutionContext, CancellationToken.
/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline.
/// </summary>
public Task ProcessNonTransactional(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger = null) => Process(message, executionContext, functionsLogger);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be calling in to an obsolete method? We're going to have to fix it as soon as we update to 2.0 anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was just to not duplicate the code because the obsoleted message still needs to work and it can't call ProcessNonTransactional because of the missing receiver (or pass null but I didn't like that). I'm fine either way.


/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline.
/// </summary>
[ObsoleteEx(
ReplacementTypeOrMember = "Process(Message, ExecutionContext, IMessageReceiver, ILogger)",
TreatAsErrorFromVersion = "2",
RemoveInVersion = "3")]
public async Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null)
{
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
public interface IFunctionEndpoint
{
/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. All messages are committed transactionally with the successful processing of the incoming message.
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. This method will lookup the <see cref="ServiceBusTriggerAttribute.AutoComplete"/> setting to determine whether to use transactional or non-transactional processing.
/// </summary>
Task ProcessTransactional(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger = null);
Task Process(Message message, ExecutionContext executionContext, IMessageReceiver messageReceiver, ILogger functionsLogger = null);

/// <summary>
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline.
/// </summary>
[ObsoleteEx(
ReplacementTypeOrMember = "Process(Message, ExecutionContext, IMessageReceiver, ILogger)",
TreatAsErrorFromVersion = "2",
RemoveInVersion = "3")]
Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
namespace NServiceBus.AzureFunctions.InProcess.ServiceBus
{
using System;
using System.Diagnostics;
using System.Reflection;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.WebJobs;

class ReflectionHelper
{
public static bool GetAutoCompleteValue()
{
var triggerAttribute = FindTriggerAttributeInternal();
if (triggerAttribute != null)
{
return triggerAttribute.AutoComplete;
}

throw new Exception($"Could not locate {nameof(ServiceBusTriggerAttribute)} to infer the AutoComplete setting. Make sure that the function trigger contains a parameter decorated with {nameof(ServiceBusTriggerAttribute)} or use the advanced APIs exposed via the {nameof(FunctionEndpoint)} type instead.");
}

public static ServiceBusTriggerAttribute FindBusTriggerAttribute() => FindTriggerAttributeInternal();

static ServiceBusTriggerAttribute FindTriggerAttributeInternal()
{
var st = new StackTrace(skipFrames: 2); // skip first two frames because it is this method + the public method
var frames = st.GetFrames();
foreach (var frame in frames)
{
var method = frame?.GetMethod();
if (method?.GetCustomAttribute<FunctionNameAttribute>(false) != null)
{
foreach (var parameter in method.GetParameters())
{
ServiceBusTriggerAttribute serviceBusTriggerAttribute;
if (parameter.ParameterType == typeof(Message)
&& (serviceBusTriggerAttribute = parameter.GetCustomAttribute<ServiceBusTriggerAttribute>(false)) != null)
{
return serviceBusTriggerAttribute;
}
}

return null;
}
}

return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Threading.Tasks;
using AzureFunctions.InProcess.ServiceBus;
using Logging;
using Microsoft.Azure.WebJobs;
using Serialization;
using Transport;

Expand Down Expand Up @@ -77,10 +76,10 @@ public ServiceBusTriggeredEndpointConfiguration(string endpointName, string conn
/// </summary>
public static ServiceBusTriggeredEndpointConfiguration FromAttributes()
{
var configuration = TriggerDiscoverer.TryGet<ServiceBusTriggerAttribute>();
if (configuration != null)
var serviceBusTriggerAttribute = ReflectionHelper.FindBusTriggerAttribute();
if (serviceBusTriggerAttribute != null)
{
return new ServiceBusTriggeredEndpointConfiguration(configuration.QueueName, configuration.Connection);
return new ServiceBusTriggeredEndpointConfiguration(serviceBusTriggerAttribute.QueueName, serviceBusTriggerAttribute.Connection);
}

throw new Exception(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ using NServiceBus;

class FunctionEndpointTrigger
{
readonly IFunctionEndpoint endpoint;
readonly FunctionEndpoint endpoint;

public FunctionEndpointTrigger(IFunctionEndpoint endpoint)
public FunctionEndpointTrigger(FunctionEndpoint endpoint)
{
this.endpoint = endpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ using NServiceBus;

class FunctionEndpointTrigger
{
readonly IFunctionEndpoint endpoint;
readonly FunctionEndpoint endpoint;

public FunctionEndpointTrigger(IFunctionEndpoint endpoint)
public FunctionEndpointTrigger(FunctionEndpoint endpoint)
{
this.endpoint = endpoint;
}
Expand All @@ -23,6 +23,6 @@ class FunctionEndpointTrigger
ILogger logger,
ExecutionContext executionContext)
{
await endpoint.Process(message, executionContext, logger);
await endpoint.ProcessNonTransactional(message, executionContext, messageReceiver, logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ using NServiceBus;

class FunctionEndpointTrigger
{
readonly IFunctionEndpoint endpoint;
readonly FunctionEndpoint endpoint;

public FunctionEndpointTrigger(IFunctionEndpoint endpoint)
public FunctionEndpointTrigger(FunctionEndpoint endpoint)
{
this.endpoint = endpoint;
}
Expand All @@ -23,6 +23,6 @@ class FunctionEndpointTrigger
ILogger logger,
ExecutionContext executionContext)
{
await endpoint.Process(message, executionContext, logger);
await endpoint.ProcessNonTransactional(message, executionContext, messageReceiver, logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ using NServiceBus;

class FunctionEndpointTrigger
{
readonly IFunctionEndpoint endpoint;
readonly FunctionEndpoint endpoint;

public FunctionEndpointTrigger(IFunctionEndpoint endpoint)
public FunctionEndpointTrigger(FunctionEndpoint endpoint)
{
this.endpoint = endpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ using NServiceBus;

class FunctionEndpointTrigger
{
readonly IFunctionEndpoint endpoint;
readonly FunctionEndpoint endpoint;

public FunctionEndpointTrigger(IFunctionEndpoint endpoint)
public FunctionEndpointTrigger(FunctionEndpoint endpoint)
{
this.endpoint = endpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ using NServiceBus;

class FunctionEndpointTrigger
{
readonly IFunctionEndpoint endpoint;
readonly FunctionEndpoint endpoint;

public FunctionEndpointTrigger(IFunctionEndpoint endpoint)
public FunctionEndpointTrigger(FunctionEndpoint endpoint)
{
this.endpoint = endpoint;
}
Expand All @@ -23,6 +23,6 @@ class FunctionEndpointTrigger
ILogger logger,
ExecutionContext executionContext)
{
await endpoint.Process(message, executionContext, logger);
await endpoint.ProcessNonTransactional(message, executionContext, messageReceiver, logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ using NServiceBus;

class FunctionEndpointTrigger
{
readonly IFunctionEndpoint endpoint;
readonly FunctionEndpoint endpoint;

public FunctionEndpointTrigger(IFunctionEndpoint endpoint)
public FunctionEndpointTrigger(FunctionEndpoint endpoint)
{
this.endpoint = endpoint;
}
Expand All @@ -23,6 +23,6 @@ class FunctionEndpointTrigger
ILogger logger,
ExecutionContext executionContext)
{
await endpoint.Process(message, executionContext, logger);
await endpoint.ProcessNonTransactional(message, executionContext, messageReceiver, logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ public void Execute(GeneratorExecutionContext context)

class FunctionEndpointTrigger
{{
readonly IFunctionEndpoint endpoint;
readonly FunctionEndpoint endpoint;

public FunctionEndpointTrigger(IFunctionEndpoint endpoint)
public FunctionEndpointTrigger(FunctionEndpoint endpoint)
{{
this.endpoint = endpoint;
}}
Expand All @@ -133,7 +133,7 @@ public async Task Run(
{{
{(syntaxReceiver.enableCrossEntityTransactions
? "await endpoint.ProcessTransactional(message, executionContext, messageReceiver, logger);"
: "await endpoint.Process(message, executionContext, logger);")}
: "await endpoint.ProcessNonTransactional(message, executionContext, messageReceiver, logger);")}
}}
}}";
context.AddSource("NServiceBus__FunctionEndpointTrigger", SourceText.From(source, Encoding.UTF8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ namespace NServiceBus
{
public class FunctionEndpoint : NServiceBus.IFunctionEndpoint
{
[System.Obsolete("Use `Process(Message, ExecutionContext, IMessageReceiver, ILogger)` instead. Will" +
" be treated as an error from version 2.0.0. Will be removed in version 3.0.0.", false)]
public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Azure.ServiceBus.Core.IMessageReceiver messageReceiver, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task ProcessNonTransactional(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Azure.ServiceBus.Core.IMessageReceiver messageReceiver, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task ProcessTransactional(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Azure.ServiceBus.Core.IMessageReceiver messageReceiver, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Publish(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
Expand All @@ -30,8 +34,10 @@ namespace NServiceBus
}
public interface IFunctionEndpoint
{
[System.Obsolete("Use `Process(Message, ExecutionContext, IMessageReceiver, ILogger)` instead. Will" +
" be treated as an error from version 2.0.0. Will be removed in version 3.0.0.", false)]
System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task ProcessTransactional(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Azure.ServiceBus.Core.IMessageReceiver messageReceiver, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Azure.ServiceBus.Core.IMessageReceiver messageReceiver, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Publish(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
Expand Down
2 changes: 1 addition & 1 deletion src/ServiceBus.Tests/FunctionEndpointComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public override async Task ComponentsStarted(CancellationToken token)
{
var transportMessage = MessageHelper.GenerateMessage(message);
var context = new ExecutionContext();
await endpoint.Process(transportMessage, context);
await endpoint.ProcessNonTransactional(transportMessage, context, null);
}
}

Expand Down
Loading