Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
namespace NServiceBus
{
using System;
using System.IO;
using System.Reflection;
using System.Runtime.Loader;
using Logging;
using System;
using System.Threading;
using System.Threading.Tasks;
using AzureFunctions.ServiceBus;
using Extensibility;
using Logging;
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Logging;
using Transport;
Expand All @@ -20,11 +20,8 @@
/// </summary>
public class FunctionEndpoint
{
private readonly Func<FunctionExecutionContext, Task<IEndpointInstance>> endpointFactory;
private ServiceBusTriggeredEndpointConfiguration configuration;

/// <summary>
/// Creates a new instance of <see cref="FunctionEndpoint"/> that can handle messages using the provided configuration.
/// Creates a new instance of <see cref="FunctionEndpoint" /> that can handle messages using the provided configuration.
/// </summary>
public FunctionEndpoint(Func<FunctionExecutionContext, ServiceBusTriggeredEndpointConfiguration> configurationFactory)
{
Expand All @@ -42,7 +39,7 @@ internal FunctionEndpoint(IStartableEndpointWithExternallyManagedContainer exter
ServiceBusTriggeredEndpointConfiguration configuration, IServiceProvider serviceProvider)
{
this.configuration = configuration;
this.endpointFactory = _ => externallyManagedContainerEndpoint.Start(serviceProvider);
endpointFactory = _ => externallyManagedContainerEndpoint.Start(serviceProvider);
}

/// <summary>
Expand Down Expand Up @@ -186,7 +183,10 @@ static bool IsRuntimeAssembly(byte[] publicKeyToken)
protected Func<FunctionExecutionContext, string> AssemblyDirectoryResolver = functionExecutionContext =>
Path.Combine(functionExecutionContext.ExecutionContext.FunctionAppDirectory, "bin");

private readonly Func<FunctionExecutionContext, Task<IEndpointInstance>> endpointFactory;

readonly SemaphoreSlim semaphoreLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private ServiceBusTriggeredEndpointConfiguration configuration;

PipelineInvoker pipeline;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Security.Cryptography;
using System.Text;

namespace NServiceBus.AzureFunctions.ServiceBus
{
static class DeterministicGuid
{
public static Guid Create(string data)
{
// use MD5 hash to get a 16-byte hash of the string
using (var provider = new MD5CryptoServiceProvider())
{
var inputBytes = Encoding.Default.GetBytes(data);
var hashBytes = provider.ComputeHash(inputBytes);
// generate a guid from the hash:
return new Guid(hashBytes);
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
namespace NServiceBus
{
using Logging;
using Microsoft.Azure.WebJobs;
using System;
using System.Threading.Tasks;
using AzureFunctions.ServiceBus;
using Logging;
using Microsoft.Azure.WebJobs;
using Serialization;
using Transport;

/// <summary>
/// Represents a serverless NServiceBus endpoint running within an AzureServiceBus trigger.
/// </summary>
public class ServiceBusTriggeredEndpointConfiguration : ServerlessEndpointConfiguration
public class ServiceBusTriggeredEndpointConfiguration
{
internal const string DefaultServiceBusConnectionName = "AzureWebJobsServiceBus";

/// <summary>
/// Azure Service Bus transport
/// </summary>
public TransportExtensions<AzureServiceBusTransport> Transport { get; }

static ServiceBusTriggeredEndpointConfiguration()
{
LogManager.UseFactory(FunctionsLoggerFactory.Instance);
Expand All @@ -25,11 +21,34 @@ static ServiceBusTriggeredEndpointConfiguration()
/// <summary>
/// Creates a serverless NServiceBus endpoint running within an Azure Service Bus trigger.
/// </summary>
public ServiceBusTriggeredEndpointConfiguration(string endpointName, string connectionStringName = null) : base(endpointName)
public ServiceBusTriggeredEndpointConfiguration(string endpointName, string connectionStringName = null)
{
EndpointConfiguration = new EndpointConfiguration(endpointName);

recoverabilityPolicy.SendFailedMessagesToErrorQueue = true;
EndpointConfiguration.Recoverability().CustomPolicy(recoverabilityPolicy.Invoke);

// Disable diagnostics by default as it will fail to create the diagnostics file in the default path.
// Can be overriden by ServerlessEndpointConfiguration.LogDiagnostics().
EndpointConfiguration.CustomDiagnosticsWriter(_ => Task.CompletedTask);

// 'WEBSITE_SITE_NAME' represents an Azure Function App and the environment variable is set when hosting the function in Azure.
var functionAppName = Environment.GetEnvironmentVariable("WEBSITE_SITE_NAME") ?? Environment.MachineName;
EndpointConfiguration.UniquelyIdentifyRunningInstance()
.UsingCustomDisplayName(functionAppName)
.UsingCustomIdentifier(DeterministicGuid.Create(functionAppName));

// Look for license as an environment variable
var licenseText = Environment.GetEnvironmentVariable("NSERVICEBUS_LICENSE");
if (!string.IsNullOrWhiteSpace(licenseText))
{
EndpointConfiguration.License(licenseText);
}

Transport = UseTransport<AzureServiceBusTransport>();

var connectionString = Environment.GetEnvironmentVariable(connectionStringName ?? DefaultServiceBusConnectionName);
var connectionString =
Environment.GetEnvironmentVariable(connectionStringName ?? DefaultServiceBusConnectionName);
Transport.ConnectionString(connectionString);

var recoverability = AdvancedConfiguration.Recoverability();
Expand All @@ -40,7 +59,21 @@ public ServiceBusTriggeredEndpointConfiguration(string endpointName, string conn
}

/// <summary>
/// Attempts to derive the required configuration parameters automatically from the Azure Functions related attributes via reflection.
/// Azure Service Bus transport
/// </summary>
public TransportExtensions<AzureServiceBusTransport> Transport { get; }

internal EndpointConfiguration EndpointConfiguration { get; }
internal PipelineInvoker PipelineInvoker { get; private set; }

/// <summary>
/// Gives access to the underlying endpoint configuration for advanced configuration options.
/// </summary>
public EndpointConfiguration AdvancedConfiguration => EndpointConfiguration;

/// <summary>
/// Attempts to derive the required configuration parameters automatically from the Azure Functions related attributes via
/// reflection.
/// </summary>
public static ServiceBusTriggeredEndpointConfiguration FromAttributes()
{
Expand All @@ -50,7 +83,51 @@ public static ServiceBusTriggeredEndpointConfiguration FromAttributes()
return new ServiceBusTriggeredEndpointConfiguration(configuration.QueueName, configuration.Connection);
}

throw new Exception($"Unable to automatically derive the endpoint name from the ServiceBusTrigger attribute. Make sure the attribute exists or create the {nameof(ServiceBusTriggeredEndpointConfiguration)} with the required parameter manually.");
throw new Exception(
$"Unable to automatically derive the endpoint name from the ServiceBusTrigger attribute. Make sure the attribute exists or create the {nameof(ServiceBusTriggeredEndpointConfiguration)} with the required parameter manually.");
}

/// <summary>
/// Define a transport to be used when sending and publishing messages.
/// </summary>
protected TransportExtensions<TTransport> UseTransport<TTransport>()
where TTransport : TransportDefinition, new()
{
var serverlessTransport = EndpointConfiguration.UseTransport<ServerlessTransport<TTransport>>();

PipelineInvoker = serverlessTransport.PipelineAccess();
return serverlessTransport.BaseTransportConfiguration();
}

/// <summary>
/// Define the serializer to be used.
/// </summary>
public SerializationExtensions<T> UseSerialization<T>() where T : SerializationDefinition, new()
{
return EndpointConfiguration.UseSerialization<T>();
}

/// <summary>
/// Disables moving messages to the error queue even if an error queue name is configured.
/// </summary>
public void DoNotSendMessagesToErrorQueue()
{
recoverabilityPolicy.SendFailedMessagesToErrorQueue = false;
}

/// <summary>
/// Logs endpoint diagnostics information to the log. Diagnostics are logged on level <see cref="LogLevel.Info" />.
/// </summary>
public void LogDiagnostics()
{
EndpointConfiguration.CustomDiagnosticsWriter(diagnostics =>
{
LogManager.GetLogger("StartupDiagnostics").Info(diagnostics);
return Task.CompletedTask;
});
}

private readonly ServerlessRecoverabilityPolicy recoverabilityPolicy = new ServerlessRecoverabilityPolicy();
internal const string DefaultServiceBusConnectionName = "AzureWebJobsServiceBus";
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,4 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"ServiceBus.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")]
namespace NServiceBus.AzureFunctions.ServiceBus
{
public abstract class ServerlessEndpointConfiguration
{
protected ServerlessEndpointConfiguration(string endpointName) { }
public NServiceBus.EndpointConfiguration AdvancedConfiguration { get; }
public void DoNotSendMessagesToErrorQueue() { }
public void LogDiagnostics() { }
public NServiceBus.Serialization.SerializationExtensions<T> UseSerialization<T>()
where T : NServiceBus.Serialization.SerializationDefinition, new () { }
protected NServiceBus.TransportExtensions<TTransport> UseTransport<TTransport>()
where TTransport : NServiceBus.Transport.TransportDefinition, new () { }
}
}
namespace NServiceBus
{
public class FunctionEndpoint
Expand All @@ -31,10 +17,17 @@ namespace NServiceBus
{
public static void UseNServiceBus(this Microsoft.Azure.Functions.Extensions.DependencyInjection.IFunctionsHostBuilder functionsHostBuilder, System.Func<NServiceBus.ServiceBusTriggeredEndpointConfiguration> configurationFactory) { }
}
public class ServiceBusTriggeredEndpointConfiguration : NServiceBus.AzureFunctions.ServiceBus.ServerlessEndpointConfiguration
public class ServiceBusTriggeredEndpointConfiguration
{
public ServiceBusTriggeredEndpointConfiguration(string endpointName, string connectionStringName = null) { }
public NServiceBus.EndpointConfiguration AdvancedConfiguration { get; }
public NServiceBus.TransportExtensions<NServiceBus.AzureServiceBusTransport> Transport { get; }
public void DoNotSendMessagesToErrorQueue() { }
public static NServiceBus.ServiceBusTriggeredEndpointConfiguration FromAttributes() { }
public void LogDiagnostics() { }
public NServiceBus.Serialization.SerializationExtensions<T> UseSerialization<T>()
where T : NServiceBus.Serialization.SerializationDefinition, new () { }
protected NServiceBus.TransportExtensions<TTransport> UseTransport<TTransport>()
where TTransport : NServiceBus.Transport.TransportDefinition, new () { }
}
}