Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 103 additions & 1 deletion src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private async Task InitializeEndpointIfNecessary(FunctionExecutionContext execut
LogManager.GetLogger("Previews").Info(
"NServiceBus.AzureFunctions.ServiceBus is a preview package. Preview packages are licensed separately from the rest of the Particular Software platform and have different support guarantees. You can view the license at https://particular.net/eula/previews and the support policy at https://docs.particular.net/previews/support-policy. Customer adoption drives whether NServiceBus.AzureFunctions.ServiceBus will be incorporated into the Particular Software platform. Let us know you are using it, if you haven't already, by emailing us at support@particular.net.");

var endpoint = await endpointFactory(executionContext).ConfigureAwait(false);
endpoint = await endpointFactory(executionContext).ConfigureAwait(false);

pipeline = configuration.PipelineInvoker;
}
Expand All @@ -121,6 +121,107 @@ private async Task InitializeEndpointIfNecessary(FunctionExecutionContext execut
}
}

/// <inheritdoc />
public async Task Send(object message, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Send(message, options).ConfigureAwait(false);
}

/// <inheritdoc />
public Task Send(object message, ExecutionContext executionContext, ILogger functionsLogger = null)
{
return Send(message, new SendOptions(), executionContext, functionsLogger);
}

/// <inheritdoc />
public async Task Send<T>(Action<T> messageConstructor, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Send(messageConstructor, options).ConfigureAwait(false);
}

/// <inheritdoc />
public Task Send<T>(Action<T> messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null)
{
return Send(messageConstructor, new SendOptions(), executionContext, functionsLogger);
}

/// <inheritdoc />
public async Task Publish(object message, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Publish(message, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Publish<T>(Action<T> messageConstructor, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Publish(messageConstructor, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Publish(object message, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Publish(message).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Publish<T>(Action<T> messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Publish(messageConstructor).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Subscribe(Type eventType, SubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Subscribe(eventType, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Subscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Subscribe(eventType).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Unsubscribe(Type eventType, UnsubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Unsubscribe(eventType, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Unsubscribe(eventType).ConfigureAwait(false);
}

private async Task InitializeEndpointUsedOutsideHandlerIfNecessary(ExecutionContext executionContext, ILogger functionsLogger)
{
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger);

await InitializeEndpointIfNecessary(functionExecutionContext).ConfigureAwait(false);
}

internal static void LoadAssemblies(string assemblyDirectory)
{
var binFiles = Directory.EnumerateFiles(
Expand Down Expand Up @@ -189,5 +290,6 @@ static bool IsRuntimeAssembly(byte[] publicKeyToken)
private ServiceBusTriggeredEndpointConfiguration configuration;

PipelineInvoker pipeline;
private IEndpointInstance endpoint;
}
}
66 changes: 65 additions & 1 deletion src/NServiceBus.AzureFunctions.ServiceBus/IFunctionEndpoint.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace NServiceBus
using System;

namespace NServiceBus
{
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
Expand All @@ -15,5 +17,67 @@ public interface IFunctionEndpoint
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline.
/// </summary>
Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Sends the provided message.
/// </summary>
Task Send(object message, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Sends the provided message.
/// </summary>
Task Send(object message, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Instantiates a message of type T and sends it.
/// </summary>
Task Send<T>(Action<T> messageConstructor, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Instantiates a message of type T and sends it.
/// </summary>
Task Send<T>(Action<T> messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Publish the message to subscribers.
/// </summary>
Task Publish(object message, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Instantiates a message of type T and publishes it.
/// </summary>
Task Publish<T>(Action<T> messageConstructor, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Instantiates a message of type T and publishes it.
/// </summary>
Task Publish(object message, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Instantiates a message of type T and publishes it.
/// </summary>
Task Publish<T>(Action<T> messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Subscribes to receive published messages of the specified type.
/// This method is only necessary if you turned off auto-subscribe.
/// </summary>
Task Subscribe(Type eventType, SubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Subscribes to receive published messages of the specified type.
/// This method is only necessary if you turned off auto-subscribe.
/// </summary>
Task Subscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Unsubscribes to receive published messages of the specified type.
/// </summary>
Task Unsubscribe(Type eventType, UnsubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);

/// <summary>
/// Unsubscribes to receive published messages of the specified type.
/// </summary>
Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="[1.1, 2)" />
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="[2.2.0, 3)" />
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="[1.5.0, 2)" />
<PackageReference Include="NServiceBus.Extensions.DependencyInjection" Version="[1.0,2)" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="[4.1.0, 5.0.0)" />
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1" />
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="2.2.0" />
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="1.8.0" />
<PackageReference Include="NServiceBus.Extensions.DependencyInjection" Version="1.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="4.2.0" />
<PackageReference Include="Particular.CodeRules" Version="0.7.0" PrivateAssets="All" />
<PackageReference Include="Particular.Packaging" Version="0.8.0" PrivateAssets="All" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@ namespace NServiceBus
protected System.Func<NServiceBus.FunctionExecutionContext, string> AssemblyDirectoryResolver;
public FunctionEndpoint(System.Func<NServiceBus.FunctionExecutionContext, NServiceBus.ServiceBusTriggeredEndpointConfiguration> configurationFactory) { }
public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Publish(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Send(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Subscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
}
public class FunctionExecutionContext
{
Expand All @@ -20,6 +32,18 @@ namespace NServiceBus
public interface IFunctionEndpoint
{
System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Publish(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Send(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Subscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
}
public class ServiceBusTriggeredEndpointConfiguration
{
Expand Down
14 changes: 9 additions & 5 deletions src/ServiceBus.Tests/DefaultEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,21 @@ public Task<EndpointConfiguration> GetConfiguration(

var transport = configuration.UseTransport<AzureServiceBusTransport>();
transport.ConnectionString(Environment.GetEnvironmentVariable(ServiceBusTriggeredEndpointConfiguration.DefaultServiceBusConnectionName));
transport.RuleNameShortener(x => x
.Replace(typeof(DefaultEndpoint).Namespace, string.Empty)
.Replace("+", string.Empty));
transport.SubscriptionRuleNamingConvention(type =>
{
if (type.FullName.Length <= 50)
{
return type.FullName;
}

return type.Name;
});

configuration.UseSerialization<NewtonsoftSerializer>();

configurationBuilderCustomization(configuration);

return Task.FromResult(configuration);
}


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
namespace ServiceBus.Tests
{
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NUnit.Framework;

public class When_publishing_event_from_function_outside_handler
{
[Test]
public async Task Should_publish_to_subscribers()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<SubscriberEndpoint>(b =>
b.When(async session => await session.Publish(new TestEvent())))
.Done(c => c.EventReceived)
.Run();

Assert.IsTrue(context.EventReceived);
}

class Context : ScenarioContext
{
public bool EventReceived { get; set; }
}

class SubscriberEndpoint : EndpointConfigurationBuilder
{
public SubscriberEndpoint()
{
EndpointSetup<DefaultEndpoint>();
}

public class EventHandler : IHandleMessages<TestEvent>
{
Context testContext;

public EventHandler(Context testContext)
{
this.testContext = testContext;
}

public Task Handle(TestEvent message, IMessageHandlerContext context)
{
testContext.EventReceived = true;
return Task.CompletedTask;
}
}
}

class TestEvent : IEvent
{
}
}
}
Loading