diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs index 9cd5ac6e..c0aa1bb5 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs @@ -109,7 +109,7 @@ private async Task InitializeEndpointIfNecessary(FunctionExecutionContext execut LogManager.GetLogger("Previews").Info( "NServiceBus.AzureFunctions.ServiceBus is a preview package. Preview packages are licensed separately from the rest of the Particular Software platform and have different support guarantees. You can view the license at https://particular.net/eula/previews and the support policy at https://docs.particular.net/previews/support-policy. Customer adoption drives whether NServiceBus.AzureFunctions.ServiceBus will be incorporated into the Particular Software platform. Let us know you are using it, if you haven't already, by emailing us at support@particular.net."); - var endpoint = await endpointFactory(executionContext).ConfigureAwait(false); + endpoint = await endpointFactory(executionContext).ConfigureAwait(false); pipeline = configuration.PipelineInvoker; } @@ -121,6 +121,107 @@ private async Task InitializeEndpointIfNecessary(FunctionExecutionContext execut } } + /// + public async Task Send(object message, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null) + { + await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false); + + await endpoint.Send(message, options).ConfigureAwait(false); + } + + /// + public Task Send(object message, ExecutionContext executionContext, ILogger functionsLogger = null) + { + return Send(message, new SendOptions(), executionContext, functionsLogger); + } + + /// + public async Task Send(Action messageConstructor, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null) + { + await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false); + + await endpoint.Send(messageConstructor, options).ConfigureAwait(false); + } + + /// + public Task Send(Action messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null) + { + return Send(messageConstructor, new SendOptions(), executionContext, functionsLogger); + } + + /// + public async Task Publish(object message, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null) + { + await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false); + + await endpoint.Publish(message, options).ConfigureAwait(false); + } + + /// + public async Task Publish(Action messageConstructor, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null) + { + await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false); + + await endpoint.Publish(messageConstructor, options).ConfigureAwait(false); + } + + /// + public async Task Publish(object message, ExecutionContext executionContext, ILogger functionsLogger = null) + { + await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false); + + await endpoint.Publish(message).ConfigureAwait(false); + } + + /// + public async Task Publish(Action messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null) + { + await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false); + + await endpoint.Publish(messageConstructor).ConfigureAwait(false); + } + + /// + public async Task Subscribe(Type eventType, SubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null) + { + await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false); + + await endpoint.Subscribe(eventType, options).ConfigureAwait(false); + } + + /// + public async Task Subscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null) + { + await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false); + + await endpoint.Subscribe(eventType).ConfigureAwait(false); + } + + /// + public async Task Unsubscribe(Type eventType, UnsubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null) + { + await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false); + + await endpoint.Unsubscribe(eventType, options).ConfigureAwait(false); + } + + /// + public async Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null) + { + await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false); + + await endpoint.Unsubscribe(eventType).ConfigureAwait(false); + } + + private async Task InitializeEndpointUsedOutsideHandlerIfNecessary(ExecutionContext executionContext, ILogger functionsLogger) + { + FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); + + var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger); + + await InitializeEndpointIfNecessary(functionExecutionContext).ConfigureAwait(false); + } + internal static void LoadAssemblies(string assemblyDirectory) { var binFiles = Directory.EnumerateFiles( @@ -189,5 +290,6 @@ static bool IsRuntimeAssembly(byte[] publicKeyToken) private ServiceBusTriggeredEndpointConfiguration configuration; PipelineInvoker pipeline; + private IEndpointInstance endpoint; } } \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/IFunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.ServiceBus/IFunctionEndpoint.cs index 07ab189d..3575ded3 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/IFunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.ServiceBus/IFunctionEndpoint.cs @@ -1,4 +1,6 @@ -namespace NServiceBus +using System; + +namespace NServiceBus { using System.Threading.Tasks; using Microsoft.Azure.ServiceBus; @@ -15,5 +17,67 @@ public interface IFunctionEndpoint /// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline. /// Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Sends the provided message. + /// + Task Send(object message, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Sends the provided message. + /// + Task Send(object message, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Instantiates a message of type T and sends it. + /// + Task Send(Action messageConstructor, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Instantiates a message of type T and sends it. + /// + Task Send(Action messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Publish the message to subscribers. + /// + Task Publish(object message, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Instantiates a message of type T and publishes it. + /// + Task Publish(Action messageConstructor, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Instantiates a message of type T and publishes it. + /// + Task Publish(object message, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Instantiates a message of type T and publishes it. + /// + Task Publish(Action messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Subscribes to receive published messages of the specified type. + /// This method is only necessary if you turned off auto-subscribe. + /// + Task Subscribe(Type eventType, SubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Subscribes to receive published messages of the specified type. + /// This method is only necessary if you turned off auto-subscribe. + /// + Task Subscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Unsubscribes to receive published messages of the specified type. + /// + Task Unsubscribe(Type eventType, UnsubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null); + + /// + /// Unsubscribes to receive published messages of the specified type. + /// + Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null); } } \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.ServiceBus/NServiceBus.AzureFunctions.ServiceBus.csproj b/src/NServiceBus.AzureFunctions.ServiceBus/NServiceBus.AzureFunctions.ServiceBus.csproj index f60f9abc..8f6759f7 100644 --- a/src/NServiceBus.AzureFunctions.ServiceBus/NServiceBus.AzureFunctions.ServiceBus.csproj +++ b/src/NServiceBus.AzureFunctions.ServiceBus/NServiceBus.AzureFunctions.ServiceBus.csproj @@ -7,11 +7,11 @@ - - - - - + + + + + diff --git a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index 8a57a8fe..3514ce85 100644 --- a/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -6,6 +6,18 @@ namespace NServiceBus protected System.Func AssemblyDirectoryResolver; public FunctionEndpoint(System.Func configurationFactory) { } public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Publish(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Publish(System.Action messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Publish(System.Action messageConstructor, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Send(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Send(System.Action messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Send(System.Action messageConstructor, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Subscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } + public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { } } public class FunctionExecutionContext { @@ -20,6 +32,18 @@ namespace NServiceBus public interface IFunctionEndpoint { System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Publish(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Publish(System.Action messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Publish(System.Action messageConstructor, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Send(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Send(System.Action messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Send(System.Action messageConstructor, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Subscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); + System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null); } public class ServiceBusTriggeredEndpointConfiguration { diff --git a/src/ServiceBus.Tests/DefaultEndpoint.cs b/src/ServiceBus.Tests/DefaultEndpoint.cs index 3fd466bb..55d6e706 100644 --- a/src/ServiceBus.Tests/DefaultEndpoint.cs +++ b/src/ServiceBus.Tests/DefaultEndpoint.cs @@ -31,9 +31,15 @@ public Task GetConfiguration( var transport = configuration.UseTransport(); transport.ConnectionString(Environment.GetEnvironmentVariable(ServiceBusTriggeredEndpointConfiguration.DefaultServiceBusConnectionName)); - transport.RuleNameShortener(x => x - .Replace(typeof(DefaultEndpoint).Namespace, string.Empty) - .Replace("+", string.Empty)); + transport.SubscriptionRuleNamingConvention(type => + { + if (type.FullName.Length <= 50) + { + return type.FullName; + } + + return type.Name; + }); configuration.UseSerialization(); @@ -41,7 +47,5 @@ public Task GetConfiguration( return Task.FromResult(configuration); } - - } } \ No newline at end of file diff --git a/src/ServiceBus.Tests/When_publishing_event_from_function_outside_handler.cs b/src/ServiceBus.Tests/When_publishing_event_from_function_outside_handler.cs new file mode 100644 index 00000000..276bd82a --- /dev/null +++ b/src/ServiceBus.Tests/When_publishing_event_from_function_outside_handler.cs @@ -0,0 +1,55 @@ +namespace ServiceBus.Tests +{ + using System.Threading.Tasks; + using NServiceBus; + using NServiceBus.AcceptanceTesting; + using NUnit.Framework; + + public class When_publishing_event_from_function_outside_handler + { + [Test] + public async Task Should_publish_to_subscribers() + { + var context = await Scenario.Define() + .WithEndpoint(b => + b.When(async session => await session.Publish(new TestEvent()))) + .Done(c => c.EventReceived) + .Run(); + + Assert.IsTrue(context.EventReceived); + } + + class Context : ScenarioContext + { + public bool EventReceived { get; set; } + } + + class SubscriberEndpoint : EndpointConfigurationBuilder + { + public SubscriberEndpoint() + { + EndpointSetup(); + } + + public class EventHandler : IHandleMessages + { + Context testContext; + + public EventHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(TestEvent message, IMessageHandlerContext context) + { + testContext.EventReceived = true; + return Task.CompletedTask; + } + } + } + + class TestEvent : IEvent + { + } + } +} \ No newline at end of file diff --git a/src/ServiceBus.Tests/When_sending_message_outside_handler.cs b/src/ServiceBus.Tests/When_sending_message_outside_handler.cs new file mode 100644 index 00000000..3c7e9336 --- /dev/null +++ b/src/ServiceBus.Tests/When_sending_message_outside_handler.cs @@ -0,0 +1,58 @@ +namespace ServiceBus.Tests +{ + using System.Threading.Tasks; + using NServiceBus; + using NServiceBus.AcceptanceTesting; + using NUnit.Framework; + using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions; + + public class When_sending_message_outside_handler + { + [Test] + public async Task Should_send_message_to_target_queue() + { + await Scenario.Define() + .WithEndpoint(b => b.When(async session => + { + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + await session.Send(new TriggerMessage(), sendOptions); + })) + .Done(c => c.HandlerReceivedMessage) + .Run(); + } + + class Context : ScenarioContext + { + public bool HandlerReceivedMessage { get; set; } + } + + public class ReceivingEndpoint : EndpointConfigurationBuilder + { + public ReceivingEndpoint() + { + EndpointSetup(); + } + + class TriggerMessageHandler : IHandleMessages + { + Context testContext; + + public TriggerMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(TriggerMessage message, IMessageHandlerContext context) + { + testContext.HandlerReceivedMessage = true; + return Task.CompletedTask; + } + } + } + + class TriggerMessage : IMessage + { + } + } +} \ No newline at end of file