Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/ManualTests.HostV4/SomeEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using NServiceBus;

public class SomeEvent : IEvent
{
public SomeEvent()
{
}
}
15 changes: 15 additions & 0 deletions src/ManualTests.HostV4/SomeEventMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Logging;

public class SomeEventMessageHandler : IHandleMessages<SomeEvent>
{
static readonly ILog Log = LogManager.GetLogger<SomeEventMessageHandler>();

public Task Handle(SomeEvent message, IMessageHandlerContext context)
{
Log.Warn($"Handling {nameof(SomeEvent)} in {nameof(SomeEventMessageHandler)}");

return Task.CompletedTask;
}
}
1 change: 0 additions & 1 deletion src/ManualTests.HostV4/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

[assembly: FunctionsStartup(typeof(Startup))]
[assembly: NServiceBusTriggerFunction("InProcess-HostV4", SendsAtomicWithReceive = true)]

public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
Expand Down
5 changes: 3 additions & 2 deletions src/ManualTests.HostV4/TriggerMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ public class TriggerMessageHandler : IHandleMessages<TriggerMessage>
{
static readonly ILog Log = LogManager.GetLogger<TriggerMessageHandler>();

public Task Handle(TriggerMessage message, IMessageHandlerContext context)
public async Task Handle(TriggerMessage message, IMessageHandlerContext context)
{
Log.Warn($"Handling {nameof(TriggerMessage)} in {nameof(TriggerMessageHandler)}");

return context.SendLocal(new SomeOtherMessage());
await context.SendLocal(new SomeOtherMessage()).ConfigureAwait(false);
await context.Publish(new SomeEvent()).ConfigureAwait(false);
}
}
7 changes: 6 additions & 1 deletion src/ManualTests.HostV4/host.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
{
"version": "2.0"
"version": "2.0",
"extensions": {
"ServiceBus": {
"EnableCrossEntityTransactions": true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public static void UseNServiceBus(
Action<ServiceBusTriggeredEndpointConfiguration> configurationFactory = null)
{
var config = functionsHostBuilder.GetContext().Configuration;
var serviceBusConfiguration = new ServiceBusTriggeredEndpointConfiguration(endpointName, config, null);

var serviceBusConfiguration = new ServiceBusTriggeredEndpointConfiguration(endpointName, config, GetTransportTransactionMode());
configurationFactory?.Invoke(serviceBusConfiguration);
RegisterEndpointFactory(functionsHostBuilder, serviceBusConfiguration);
}
Expand All @@ -60,7 +61,7 @@ public static void UseNServiceBus(
Action<ServiceBusTriggeredEndpointConfiguration> configurationFactory = null)
{
var config = functionsHostBuilder.GetContext().Configuration;
var serviceBusConfiguration = new ServiceBusTriggeredEndpointConfiguration(endpointName, config, connectionString);
var serviceBusConfiguration = new ServiceBusTriggeredEndpointConfiguration(endpointName, config, GetTransportTransactionMode(), connectionString);
configurationFactory?.Invoke(serviceBusConfiguration);
RegisterEndpointFactory(functionsHostBuilder, serviceBusConfiguration);
}
Expand Down Expand Up @@ -109,5 +110,19 @@ internal static IStartableEndpointWithExternallyManagedContainer Configure(
endpointConfiguration,
serviceCollection);
}

static TransportTransactionMode GetTransportTransactionMode()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this hugely negatively affect the function startup time? Wouldn't it be better to rethink our current approach of hosting endpoints within functions to avoid having to scan assemblies?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the design needs a rethink but we are already doing the reflection for most user already since we pick the endpoint name from the attribute here https://github.com/Particular/NServiceBus.AzureFunctions.InProcess.ServiceBus/blob/master/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/FunctionsHostBuilderExtensions.cs#L24

Perhaps we can make this more efficient somehow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So by looking at the code you posted, there is an assumption this attribute is placed into the calling assembly. So, why don't we try to get that attribute for now in all those extensions once and then piggyback the information from there into the configuration? Is there a legacy that would prevent us from doing that?

{
foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies())
{
var attribute = assembly.GetCustomAttribute<NServiceBusTriggerFunctionAttribute>();

if (attribute != null)
{
return attribute.SendsAtomicWithReceive ? TransportTransactionMode.SendsAtomicWithReceive : TransportTransactionMode.ReceiveOnly;
}
}
return TransportTransactionMode.SendsAtomicWithReceive;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using AzureFunctions.InProcess.ServiceBus;
using Configuration.AdvancedExtensibility;
using Logging;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Extensions.Configuration;
using Serialization;

Expand Down Expand Up @@ -40,7 +41,7 @@ static ServiceBusTriggeredEndpointConfiguration()
/// <summary>
/// Creates a serverless NServiceBus endpoint.
/// </summary>
internal ServiceBusTriggeredEndpointConfiguration(string endpointName, IConfiguration configuration, string connectionString = default)
internal ServiceBusTriggeredEndpointConfiguration(string endpointName, IConfiguration configuration, TransportTransactionMode transportTransactionMode, string connectionString = default)
{
var endpointConfiguration = new EndpointConfiguration(endpointName);

Expand Down Expand Up @@ -74,9 +75,23 @@ internal ServiceBusTriggeredEndpointConfiguration(string endpointName, IConfigur
}
}

if (transportTransactionMode == TransportTransactionMode.SendsAtomicWithReceive)
{
var serviceBusOptions = configuration.GetSection("AzureFunctionsJobHost:extensions:ServiceBus")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a code first API that would allow not to go over the specific configuration section that could backfire on us here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find anything when I did the research and GetSection only accepts a string https://docs.microsoft.com/en-us/dotnet/api/system.configuration.configurationmanager.getsection?view=dotnet-plat-ext-6.0

.Get<ServiceBusOptions>();

if (serviceBusOptions == null || serviceBusOptions.EnableCrossEntityTransactions)
{
throw new Exception("SendsAtomicWithReceive mode requires EnableCrossEntityTransactions needs to be enabled on the ServiceBusOptions.");
}
}

Transport = new AzureServiceBusTransport(connectionString);

serverlessTransport = new ServerlessTransport(Transport);
serverlessTransport = new ServerlessTransport(Transport)
{
TransportTransactionMode = transportTransactionMode
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't strictly needed since our transport isn't receiving messages but I figured that this would be more future proof if we for some reason add some logic in the outgoing pipeline that makes decisions based on the transaction mode

};
var serverlessRouting = endpointConfiguration.UseTransport(serverlessTransport);
Routing = new RoutingSettings<AzureServiceBusTransport>(serverlessRouting.GetSettings());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public FunctionRunner(IList<object> messages,

public override Task Start(CancellationToken token)
{
var functionEndpointConfiguration = new ServiceBusTriggeredEndpointConfiguration(Name, default, null);
var functionEndpointConfiguration = new ServiceBusTriggeredEndpointConfiguration(Name, default, TransportTransactionMode.ReceiveOnly);
configurationCustomization(functionEndpointConfiguration);
var endpointConfiguration = functionEndpointConfiguration.AdvancedConfiguration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void Should_guide_user_towards_success()
Environment.SetEnvironmentVariable(defaultConnectionStringKey, null, EnvironmentVariableTarget.Process);

var exception = Assert.Throws<Exception>(
() => new ServiceBusTriggeredEndpointConfiguration("SampleEndpoint", default, null),
() => new ServiceBusTriggeredEndpointConfiguration("SampleEndpoint", default, TransportTransactionMode.ReceiveOnly),
"Exception should be thrown at endpoint creation so that the error will be found during functions startup"
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public async Task Should_load_handlers_from_assembly_when_using_FunctionsHostBui

var serviceCollection = new ServiceCollection();

var configuration = new ServiceBusTriggeredEndpointConfiguration("assemblyTest", default, null);
var configuration = new ServiceBusTriggeredEndpointConfiguration("assemblyTest", default, TransportTransactionMode.ReceiveOnly);
configuration.UseSerialization<XmlSerializer>();

var endpointConfiguration = configuration.AdvancedConfiguration;
Expand Down