diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index 99c0c669be..11b6bce388 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -1,4 +1,5 @@ #region Licence + /* The MIT License (MIT) Copyright © 2014 Ian Cooper @@ -67,7 +68,7 @@ public class CommandProcessor : IAmACommandProcessor private const string PROCESSCOMMAND = "Process Command"; private const string PROCESSEVENT = "Process Event"; private const string DEPOSITPOST = "Deposit Post"; - + /// /// Use this as an identifier for your that determines for how long to break the circuit when communication with the Work Queue fails. /// Register that policy with your such as @@ -95,7 +96,7 @@ public class CommandProcessor : IAmACommandProcessor /// You can use this an identifier for you own policies, if your generic policy is the same as your Work Queue policy. /// public const string RETRYPOLICYASYNC = "Paramore.Brighter.CommandProcessor.RetryPolicy.Async"; - + //We want to use double lock to let us pass parameters to the constructor from the first instance private static ExternalBusServices _bus = null; private static readonly object padlock = new object(); @@ -120,11 +121,11 @@ public class CommandProcessor : IAmACommandProcessor ) { _subscriberRegistry = subscriberRegistry; - + if (HandlerFactoryIsNotEitherIAmAHandlerFactorySyncOrAsync(handlerFactory)) throw new ArgumentException( "No HandlerFactory has been set - either an instance of IAmAHandlerFactorySync or IAmAHandlerFactoryAsync needs to be set"); - + if (handlerFactory is IAmAHandlerFactorySync handlerFactorySync) _handlerFactorySync = handlerFactorySync; if (handlerFactory is IAmAHandlerFactoryAsync handlerFactoryAsync) @@ -160,7 +161,7 @@ public class CommandProcessor : IAmACommandProcessor IAmAFeatureSwitchRegistry featureSwitchRegistry = null, InboxConfiguration inboxConfiguration = null, IAmABoxTransactionConnectionProvider boxTransactionConnectionProvider = null, - int outboxBulkChunkSize = 100, + int outboxBulkChunkSize = 100, IAmAMessageTransformerFactory messageTransformerFactory = null) { _requestContextFactory = requestContextFactory; @@ -169,7 +170,7 @@ public class CommandProcessor : IAmACommandProcessor _inboxConfiguration = inboxConfiguration; _boxTransactionConnectionProvider = boxTransactionConnectionProvider; _transformPipelineBuilder = new TransformPipelineBuilder(mapperRegistry, messageTransformerFactory); - + InitExtServiceBus(policyRegistry, outBox, outboxTimeout, producerRegistry, outboxBulkChunkSize); ConfigureCallbacks(producerRegistry); @@ -207,7 +208,7 @@ public class CommandProcessor : IAmACommandProcessor IAmAChannelFactory responseChannelFactory = null, InboxConfiguration inboxConfiguration = null, IAmABoxTransactionConnectionProvider boxTransactionConnectionProvider = null, - int outboxBulkChunkSize = 100, + int outboxBulkChunkSize = 100, IAmAMessageTransformerFactory messageTransformerFactory = null) : this(subscriberRegistry, handlerFactory, requestContextFactory, policyRegistry) { @@ -217,9 +218,9 @@ public class CommandProcessor : IAmACommandProcessor _boxTransactionConnectionProvider = boxTransactionConnectionProvider; _replySubscriptions = replySubscriptions; _transformPipelineBuilder = new TransformPipelineBuilder(mapperRegistry, messageTransformerFactory); - + InitExtServiceBus(policyRegistry, outBox, outboxTimeout, producerRegistry, outboxBulkChunkSize); - + ConfigureCallbacks(producerRegistry); } @@ -251,20 +252,20 @@ public class CommandProcessor : IAmACommandProcessor IAmAFeatureSwitchRegistry featureSwitchRegistry = null, InboxConfiguration inboxConfiguration = null, IAmABoxTransactionConnectionProvider boxTransactionConnectionProvider = null, - int outboxBulkChunkSize = 100, + int outboxBulkChunkSize = 100, IAmAMessageTransformerFactory messageTransformerFactory = null) : this(subscriberRegistry, handlerFactory, requestContextFactory, policyRegistry, featureSwitchRegistry) { _inboxConfiguration = inboxConfiguration; _boxTransactionConnectionProvider = boxTransactionConnectionProvider; _transformPipelineBuilder = new TransformPipelineBuilder(mapperRegistry, messageTransformerFactory); - + InitExtServiceBus(policyRegistry, outBox, outboxTimeout, producerRegistry, outboxBulkChunkSize); ConfigureCallbacks(producerRegistry); } - /// + /// /// Sends the specified command. We expect only one handler. The command is handled synchronously. /// /// @@ -275,7 +276,7 @@ public class CommandProcessor : IAmACommandProcessor { if (_handlerFactorySync == null) throw new InvalidOperationException("No handler factory defined."); - + var span = GetSpan(PROCESSCOMMAND); command.Span = span.span; @@ -304,7 +305,6 @@ public class CommandProcessor : IAmACommandProcessor { EndSpan(span.span); } - } } @@ -316,7 +316,8 @@ public class CommandProcessor : IAmACommandProcessor /// Should we use the calling thread's synchronization context when continuing or a default thread synchronization context. Defaults to false /// Allows the sender to cancel the request pipeline. Optional /// awaitable . - public async Task SendAsync(T command, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default(CancellationToken)) + public async Task SendAsync(T command, bool continueOnCapturedContext = false, + CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest { if (_handlerFactoryAsync == null) @@ -324,7 +325,7 @@ public async Task SendAsync(T command, bool continueOnCapturedContext = false var span = GetSpan(PROCESSCOMMAND); command.Span = span.span; - + var requestContext = _requestContextFactory.Create(); requestContext.Policies = _policyRegistry; requestContext.FeatureSwitches = _featureSwitchRegistry; @@ -333,14 +334,16 @@ public async Task SendAsync(T command, bool continueOnCapturedContext = false { try { - s_logger.LogInformation("Building send async pipeline for command: {CommandType} {Id}", command.GetType(), command.Id); - var handlerChain = builder.BuildAsync(requestContext, continueOnCapturedContext); + s_logger.LogInformation("Building send async pipeline for command: {CommandType} {Id}", + command.GetType(), command.Id); + var handlerChain = builder.BuildAsync(requestContext, continueOnCapturedContext); - AssertValidSendPipeline(command, handlerChain.Count()); + AssertValidSendPipeline(command, handlerChain.Count()); - await handlerChain.First().HandleAsync(command, cancellationToken).ConfigureAwait(continueOnCapturedContext); + await handlerChain.First().HandleAsync(command, cancellationToken) + .ConfigureAwait(continueOnCapturedContext); } - catch (Exception) + catch (Exception) { span.span?.SetStatus(ActivityStatusCode.Error); throw; @@ -368,19 +371,21 @@ public async Task SendAsync(T command, bool continueOnCapturedContext = false var span = GetSpan(PROCESSEVENT); @event.Span = span.span; - + var requestContext = _requestContextFactory.Create(); requestContext.Policies = _policyRegistry; requestContext.FeatureSwitches = _featureSwitchRegistry; using (var builder = new PipelineBuilder(_subscriberRegistry, _handlerFactorySync, _inboxConfiguration)) { - s_logger.LogInformation("Building send pipeline for event: {EventType} {Id}", @event.GetType(), @event.Id); + s_logger.LogInformation("Building send pipeline for event: {EventType} {Id}", @event.GetType(), + @event.Id); var handlerChain = builder.Build(requestContext); var handlerCount = handlerChain.Count(); - s_logger.LogInformation("Found {HandlerCount} pipelines for event: {EventType} {Id}", handlerCount, @event.GetType(), @event.Id); + s_logger.LogInformation("Found {HandlerCount} pipelines for event: {EventType} {Id}", handlerCount, + @event.GetType(), @event.Id); var exceptions = new List(); foreach (var handleRequests in handlerChain) @@ -391,19 +396,22 @@ public async Task SendAsync(T command, bool continueOnCapturedContext = false } catch (Exception e) { - exceptions.Add(e); + exceptions.Add(e); } } - if (span.created) { - if (exceptions.Any()) - span.span?.SetStatus(ActivityStatusCode.Error); - EndSpan(span.span); + if (span.created) + { + if (exceptions.Any()) + span.span?.SetStatus(ActivityStatusCode.Error); + EndSpan(span.span); } if (exceptions.Any()) { - throw new AggregateException("Failed to publish to one more handlers successfully, see inner exceptions for details", exceptions); + throw new AggregateException( + "Failed to publish to one more handlers successfully, see inner exceptions for details", + exceptions); } } } @@ -420,7 +428,8 @@ public async Task SendAsync(T command, bool continueOnCapturedContext = false /// Should we use the calling thread's synchronization context when continuing or a default thread synchronization context. Defaults to false /// Allows the sender to cancel the request pipeline. Optional /// awaitable . - public async Task PublishAsync(T @event, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default(CancellationToken)) + public async Task PublishAsync(T @event, bool continueOnCapturedContext = false, + CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest { if (_handlerFactoryAsync == null) @@ -428,19 +437,21 @@ public async Task PublishAsync(T @event, bool continueOnCapturedContext = fal var span = GetSpan(PROCESSEVENT); @event.Span = span.span; - + var requestContext = _requestContextFactory.Create(); requestContext.Policies = _policyRegistry; requestContext.FeatureSwitches = _featureSwitchRegistry; using (var builder = new PipelineBuilder(_subscriberRegistry, _handlerFactoryAsync, _inboxConfiguration)) { - s_logger.LogInformation("Building send async pipeline for event: {EventType} {Id}", @event.GetType(), @event.Id); + s_logger.LogInformation("Building send async pipeline for event: {EventType} {Id}", @event.GetType(), + @event.Id); var handlerChain = builder.BuildAsync(requestContext, continueOnCapturedContext); var handlerCount = handlerChain.Count(); - s_logger.LogInformation("Found {0} async pipelines for event: {EventType} {Id}", handlerCount, @event.GetType(), @event.Id); + s_logger.LogInformation("Found {0} async pipelines for event: {EventType} {Id}", handlerCount, + @event.GetType(), @event.Id); var exceptions = new List(); foreach (var handler in handlerChain) @@ -451,41 +462,27 @@ public async Task PublishAsync(T @event, bool continueOnCapturedContext = fal } catch (Exception e) { - exceptions.Add(e); + exceptions.Add(e); } } - if (span.created) { - if (exceptions.Any()) - span.span?.SetStatus(ActivityStatusCode.Error); - EndSpan(span.span); + if (span.created) + { + if (exceptions.Any()) + span.span?.SetStatus(ActivityStatusCode.Error); + EndSpan(span.span); } if (exceptions.Count > 0) { - throw new AggregateException("Failed to async publish to one more handlers successfully, see inner exceptions for details", exceptions); + throw new AggregateException( + "Failed to async publish to one more handlers successfully, see inner exceptions for details", + exceptions); } } } - private (Activity span, bool created) GetSpan(string activityName) - { - bool create = Activity.Current == null; - - if(create) - return (ApplicationTelemetry.ActivitySource.StartActivity(activityName, ActivityKind.Server), create); - else - return (Activity.Current, create); - } - - private void EndSpan(Activity span) - { - if (span?.Status == ActivityStatusCode.Unset) - span.SetStatus(ActivityStatusCode.Ok); - span?.Dispose(); - } - /// /// Posts the specified request. The message is placed on a task queue and into a outbox for reposting in the event of failure. /// You will need to configure a service that reads from the task queue to process the message @@ -520,11 +517,12 @@ private void EndSpan(Activity span) /// Allows the sender to cancel the request pipeline. Optional /// /// awaitable . - public async Task PostAsync(T request, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default(CancellationToken)) + public async Task PostAsync(T request, bool continueOnCapturedContext = false, + CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest { var messageId = await DepositPostAsync(request, null, continueOnCapturedContext, cancellationToken); - await ClearOutboxAsync(new Guid[] {messageId}, continueOnCapturedContext, cancellationToken); + await ClearOutboxAsync(new Guid[] { messageId }, continueOnCapturedContext, cancellationToken); } /// @@ -541,7 +539,7 @@ public async Task PostAsync(T request, bool continueOnCapturedContext = false { return DepositPost(request, _boxTransactionConnectionProvider); } - + /// /// Adds a messages into the outbox, and returns the id of the saved message. /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ normally you include the @@ -556,8 +554,9 @@ public async Task PostAsync(T request, bool continueOnCapturedContext = false { return DepositPost(requests, _boxTransactionConnectionProvider); } - - private Guid DepositPost(T request, IAmABoxTransactionConnectionProvider connectionProvider) where T : class, IRequest + + private Guid DepositPost(T request, IAmABoxTransactionConnectionProvider connectionProvider) + where T : class, IRequest { s_logger.LogInformation("Save request: {RequestType} {Id}", request.GetType(), request.Id); @@ -572,22 +571,23 @@ public async Task PostAsync(T request, bool continueOnCapturedContext = false return message.Id; } - - private Guid[] DepositPost(IEnumerable requests, IAmABoxTransactionConnectionProvider connectionProvider) where T : class, IRequest + + private Guid[] DepositPost(IEnumerable requests, IAmABoxTransactionConnectionProvider connectionProvider) + where T : class, IRequest { if (!_bus.HasBulkOutbox()) throw new InvalidOperationException("No Bulk outbox defined."); var successfullySentMessage = new List(); - + foreach (var batch in SplitRequestBatchIntoTypes(requests)) { var messages = MapMessages(batch.Key, batch); - + s_logger.LogInformation("Save requests: {RequestType} {AmountOfMessages}", batch.Key, messages.Count()); _bus.AddToOutbox(messages, connectionProvider); - + successfullySentMessage.AddRange(messages.Select(m => m.Id)); } @@ -609,9 +609,10 @@ public async Task PostAsync(T request, bool continueOnCapturedContext = false public async Task DepositPostAsync(T request, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest { - return await DepositPostAsync(request, _boxTransactionConnectionProvider, continueOnCapturedContext, cancellationToken); + return await DepositPostAsync(request, _boxTransactionConnectionProvider, continueOnCapturedContext, + cancellationToken); } - + /// /// Adds a message into the outbox, and returns the id of the saved message. /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ normally you include the @@ -627,10 +628,12 @@ public async Task PostAsync(T request, bool continueOnCapturedContext = false public Task DepositPostAsync(IEnumerable requests, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest { - return DepositPostAsync(requests, _boxTransactionConnectionProvider, continueOnCapturedContext, cancellationToken); + return DepositPostAsync(requests, _boxTransactionConnectionProvider, continueOnCapturedContext, + cancellationToken); } - - private async Task DepositPostAsync(T request, IAmABoxTransactionConnectionProvider connectionProvider, bool continueOnCapturedContext = false, + + private async Task DepositPostAsync(T request, IAmABoxTransactionConnectionProvider connectionProvider, + bool continueOnCapturedContext = false, CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest { s_logger.LogInformation("Save request: {RequestType} {Id}", request.GetType(), request.Id); @@ -642,89 +645,11 @@ public async Task PostAsync(T request, bool continueOnCapturedContext = false AddTelemetryToMessage(message); - await _bus.AddToOutboxAsync(request, continueOnCapturedContext, cancellationToken, message, connectionProvider); + await _bus.AddToOutboxAsync(request, continueOnCapturedContext, cancellationToken, message, + connectionProvider); return message.Id; } - - private async Task DepositPostAsync(IEnumerable requests, IAmABoxTransactionConnectionProvider connectionProvider, bool continueOnCapturedContext = false, - CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest - { - if (!_bus.HasAsyncBulkOutbox()) - throw new InvalidOperationException("No bulk async outbox defined."); - - var successfullySentMessage = new List(); - - foreach (var batch in SplitRequestBatchIntoTypes(requests)) - { - var messages = await MapMessagesAsync(batch.Key, batch.ToArray(), cancellationToken); - - s_logger.LogInformation("Save requests: {RequestType} {AmountOfMessages}", batch.Key, messages.Count()); - - await _bus.AddToOutboxAsync(messages, continueOnCapturedContext, cancellationToken, connectionProvider); - - successfullySentMessage.AddRange(messages.Select(m => m.Id)); - } - - return successfullySentMessage.ToArray(); - } - - private IEnumerable> SplitRequestBatchIntoTypes(IEnumerable requests) - { - return requests.GroupBy(r => r.GetType()); - } - - private List MapMessages(Type requestType, IEnumerable requests) - { - return (List)GetType() - .GetMethod(nameof(BulkMapMessages), BindingFlags.Instance | BindingFlags.NonPublic) - .MakeGenericMethod(requestType) - .Invoke(this, new[] { requests }); - } - - private Task> MapMessagesAsync(Type requestType, IEnumerable requests, CancellationToken cancellationToken) - { - var parameters = new object[] { requests, cancellationToken }; - return (Task>)GetType() - .GetMethod(nameof(BulkMapMessagesAsync), BindingFlags.Instance | BindingFlags.NonPublic) - .MakeGenericMethod(requestType) - .Invoke(this, parameters); - } - - private List BulkMapMessages(IEnumerable requests) where T : class, IRequest - { - return requests.Select(r => - { - var wrapPipeline = _transformPipelineBuilder.BuildWrapPipeline(); - var message = wrapPipeline.WrapAsync((T)r).GetAwaiter().GetResult(); - AddTelemetryToMessage(message); - return message; - }).ToList(); - } - - private async Task> BulkMapMessagesAsync(IEnumerable requests, CancellationToken cancellationToken = default) where T : class, IRequest - { - var messages = new List(); - foreach (var request in requests) - { - var wrapPipeline = _transformPipelineBuilder.BuildWrapPipeline(); - var message = await wrapPipeline.WrapAsync((T)request, cancellationToken); - AddTelemetryToMessage(message); - messages.Add(message); - } - - return messages; - } - - private void AddTelemetryToMessage(Message message) - { - var activity = Activity.Current ?? ApplicationTelemetry.ActivitySource.StartActivity(DEPOSITPOST, ActivityKind.Producer); - - if (activity != null) - { - message.Header.AddTelemetryInformation(activity, typeof(T).ToString()); - } - } /// @@ -734,9 +659,9 @@ private void AddTelemetryToMessage(Message message) /// The posts to flush public void ClearOutbox(params Guid[] posts) { - _bus.ClearOutbox(posts); + _bus.ClearOutbox(posts); } - + /// /// Flushes any outstanding message box message to the broker. /// This will be run on a background task. @@ -745,9 +670,9 @@ public void ClearOutbox(params Guid[] posts) /// The maximum number to clear. /// The minimum age to clear in milliseconds. /// Optional bag of arguments required by an outbox implementation to sweep - public void ClearOutbox( int amountToClear = 100, int minimumAge = 5000, Dictionary args = null) + public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictionary args = null) { - _bus.ClearOutbox(amountToClear, minimumAge, false, false, args); + _bus.ClearOutbox(amountToClear, minimumAge, false, false, args); } /// @@ -756,13 +681,13 @@ public void ClearOutbox( int amountToClear = 100, int minimumAge = 5000, Diction /// /// The posts to flush public async Task ClearOutboxAsync( - IEnumerable posts, + IEnumerable posts, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default(CancellationToken)) { - await _bus.ClearOutboxAsync(posts, continueOnCapturedContext, cancellationToken); + await _bus.ClearOutboxAsync(posts, continueOnCapturedContext, cancellationToken); } - + /// /// Flushes any outstanding message box message to the broker. /// This will be run on a background task. @@ -777,9 +702,9 @@ public void ClearOutbox( int amountToClear = 100, int minimumAge = 5000, Diction int minimumAge = 5000, bool useBulk = false, Dictionary args = null - ) + ) { - _bus.ClearOutbox(amountToClear, minimumAge, true, useBulk, args); + _bus.ClearOutbox(amountToClear, minimumAge, true, useBulk, args); } /// @@ -802,7 +727,7 @@ public void ClearOutbox( int amountToClear = 100, int minimumAge = 5000, Diction } var outWrapPipeline = _transformPipelineBuilder.BuildWrapPipeline(); - + var subscription = _replySubscriptions.FirstOrDefault(s => s.DataType == typeof(TResponse)); if (subscription is null) @@ -826,7 +751,7 @@ public void ClearOutbox( int amountToClear = 100, int minimumAge = 5000, Diction //the channel to create the subscription, but this does not do much on a new queue _bus.Retry(() => responseChannel.Purge()); - var outMessage = outWrapPipeline.WrapAsync(request).GetAwaiter().GetResult(); + var outMessage = outWrapPipeline.WrapAsync(request).GetAwaiter().GetResult(); //We don't store the message, if we continue to fail further retry is left to the sender //s_logger.LogDebug("Sending request with routingkey {0}", routingKey); @@ -852,9 +777,7 @@ public void ClearOutbox( int amountToClear = 100, int minimumAge = 5000, Diction s_logger.LogInformation("Deleting queue for routingkey: {ChannelName}", channelName); return response; - } //clean up everything at this point, whatever happens - } /// @@ -879,24 +802,45 @@ public static void ClearExtServiceBus() private void AssertValidSendPipeline(T command, int handlerCount) where T : class, IRequest { - s_logger.LogInformation("Found {HandlerCount} pipelines for command: {Type} {Id}", handlerCount, typeof(T), command.Id); + s_logger.LogInformation("Found {HandlerCount} pipelines for command: {Type} {Id}", handlerCount, typeof(T), + command.Id); if (handlerCount > 1) - throw new ArgumentException($"More than one handler was found for the typeof command {typeof(T)} - a command should only have one handler."); + throw new ArgumentException( + $"More than one handler was found for the typeof command {typeof(T)} - a command should only have one handler."); if (handlerCount == 0) - throw new ArgumentException($"No command handler was found for the typeof command {typeof(T)} - a command should have exactly one handler."); + throw new ArgumentException( + $"No command handler was found for the typeof command {typeof(T)} - a command should have exactly one handler."); } - - + + private void ConfigureCallbacks(IAmAProducerRegistry producerRegistry) { //Only register one, to avoid two callbacks where we support both interfaces on a producer foreach (var producer in producerRegistry.Producers) { - if (!_bus.ConfigurePublisherCallbackMaybe(producer)) _bus.ConfigureAsyncPublisherCallbackMaybe(producer); + if (!_bus.ConfigurePublisherCallbackMaybe(producer)) + _bus.ConfigureAsyncPublisherCallbackMaybe(producer); } } + private (Activity span, bool created) GetSpan(string activityName) + { + bool create = Activity.Current == null; + + if (create) + return (ApplicationTelemetry.ActivitySource.StartActivity(activityName, ActivityKind.Server), create); + else + return (Activity.Current, create); + } + + private void EndSpan(Activity span) + { + if (span?.Status == ActivityStatusCode.Unset) + span.SetStatus(ActivityStatusCode.Ok); + span?.Dispose(); + } + //Create an instance of the ExternalBusServices if one not already set for this app. Note that we do not support reinitialization here, so once you have //set a command processor for the app, you can't call init again to set them - although the properties are not read-only so overwriting is possible //if needed as a "get out of gaol" card. @@ -913,11 +857,13 @@ private void ConfigureCallbacks(IAmAProducerRegistry producerRegistry) { if (_bus == null) { - if (producerRegistry == null) throw new ConfigurationException("A producer registry is required to create an external bus"); - + if (producerRegistry == null) + throw new ConfigurationException( + "A producer registry is required to create an external bus"); + _bus = new ExternalBusServices(); - if(outbox is IAmAnOutboxSync syncOutbox)_bus.OutBox = syncOutbox; - if(outbox is IAmAnOutboxAsync asyncOutbox)_bus.AsyncOutbox = asyncOutbox; + if (outbox is IAmAnOutboxSync syncOutbox) _bus.OutBox = syncOutbox; + if (outbox is IAmAnOutboxAsync asyncOutbox) _bus.AsyncOutbox = asyncOutbox; _bus.OutboxTimeout = outboxTimeout; _bus.PolicyRegistry = policyRegistry; @@ -928,6 +874,90 @@ private void ConfigureCallbacks(IAmAProducerRegistry producerRegistry) } } + private async Task DepositPostAsync(IEnumerable requests, + IAmABoxTransactionConnectionProvider connectionProvider, bool continueOnCapturedContext = false, + CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest + { + if (!_bus.HasAsyncBulkOutbox()) + throw new InvalidOperationException("No bulk async outbox defined."); + + var successfullySentMessage = new List(); + + foreach (var batch in SplitRequestBatchIntoTypes(requests)) + { + var messages = await MapMessagesAsync(batch.Key, batch.ToArray(), cancellationToken); + + s_logger.LogInformation("Save requests: {RequestType} {AmountOfMessages}", batch.Key, messages.Count()); + + await _bus.AddToOutboxAsync(messages, continueOnCapturedContext, cancellationToken, connectionProvider); + + successfullySentMessage.AddRange(messages.Select(m => m.Id)); + } + + return successfullySentMessage.ToArray(); + } + + private IEnumerable> SplitRequestBatchIntoTypes(IEnumerable requests) + { + return requests.GroupBy(r => r.GetType()); + } + + private List MapMessages(Type requestType, IEnumerable requests) + { + return (List)GetType() + .GetMethod(nameof(BulkMapMessages), BindingFlags.Instance | BindingFlags.NonPublic) + .MakeGenericMethod(requestType) + .Invoke(this, new[] { requests }); + } + + private Task> MapMessagesAsync(Type requestType, IEnumerable requests, + CancellationToken cancellationToken) + { + var parameters = new object[] { requests, cancellationToken }; + return (Task>)GetType() + .GetMethod(nameof(BulkMapMessagesAsync), BindingFlags.Instance | BindingFlags.NonPublic) + .MakeGenericMethod(requestType) + .Invoke(this, parameters); + } + + private List BulkMapMessages(IEnumerable requests) where T : class, IRequest + { + return requests.Select(r => + { + var wrapPipeline = _transformPipelineBuilder.BuildWrapPipeline(); + var message = wrapPipeline.WrapAsync((T)r).GetAwaiter().GetResult(); + AddTelemetryToMessage(message); + return message; + }).ToList(); + } + + private async Task> BulkMapMessagesAsync(IEnumerable requests, + CancellationToken cancellationToken = default) where T : class, IRequest + { + var messages = new List(); + foreach (var request in requests) + { + var wrapPipeline = _transformPipelineBuilder.BuildWrapPipeline(); + var message = await wrapPipeline.WrapAsync((T)request, cancellationToken); + AddTelemetryToMessage(message); + messages.Add(message); + } + + return messages; + } + + private void AddTelemetryToMessage(Message message) + { + var activity = Activity.Current ?? + ApplicationTelemetry.ActivitySource.StartActivity(DEPOSITPOST, ActivityKind.Producer); + + if (activity != null) + { + message.Header.AddTelemetryInformation(activity, typeof(T).ToString()); + } + } + + private bool HandlerFactoryIsNotEitherIAmAHandlerFactorySyncOrAsync(IAmAHandlerFactory handlerFactory) { // If we do not have a subscriber registry and we do not have a handler factory