diff --git a/src/Aggregates.NET.NServiceBus/Internal/BulkInvokeHandlerTerminator.cs b/src/Aggregates.NET.NServiceBus/Internal/BulkInvokeHandlerTerminator.cs index ea75114e..a51bf5ed 100644 --- a/src/Aggregates.NET.NServiceBus/Internal/BulkInvokeHandlerTerminator.cs +++ b/src/Aggregates.NET.NServiceBus/Internal/BulkInvokeHandlerTerminator.cs @@ -173,8 +173,23 @@ private async Task InvokeDelayedChannel(IDelayedChannel channel, string channelK using (var ctx = _metrics.Begin("Bulk Messages Time")) { - foreach (var idx in Enumerable.Range(0, messages.Length)) - await handler.Invoke(messages[idx].Message, context).ConfigureAwait(false); + switch (attr.Mode) + { + case DeliveryMode.Single: + foreach (var idx in Enumerable.Range(0, messages.Length)) + await handler.Invoke(messages[idx].Message, context).ConfigureAwait(false); + break; + case DeliveryMode.First: + await handler.Invoke(messages[0].Message, context).ConfigureAwait(false); + break; + case DeliveryMode.Last: + await handler.Invoke(messages[messages.Length-1].Message, context).ConfigureAwait(false); + break; + case DeliveryMode.FirstAndLast: + await handler.Invoke(messages[0].Message, context).ConfigureAwait(false); + await handler.Invoke(messages[messages.Length - 1].Message, context).ConfigureAwait(false); + break; + } if(ctx.Elapsed > TimeSpan.FromSeconds(5)) SlowLogger.InfoEvent("Invoked", "{Count} messages channel [{Channel:l}] key [{Key:l}] took {Milliseconds} ms", count, channelKey, specificKey, ctx.Elapsed.TotalMilliseconds); diff --git a/src/Aggregates.NET/Attributes/DelayedAttribute.cs b/src/Aggregates.NET/Attributes/DelayedAttribute.cs index 99364d17..59edb8cc 100644 --- a/src/Aggregates.NET/Attributes/DelayedAttribute.cs +++ b/src/Aggregates.NET/Attributes/DelayedAttribute.cs @@ -5,16 +5,37 @@ namespace Aggregates.Attributes { + public enum DeliveryMode + { + /// + /// Deliver each message as a single call to message handler + /// + Single, + /// + /// Deliver only the first message from a collection of waiting messages + /// + First, + /// + /// Deliver only the last message from a collection of waiting messages + /// + Last, + /// + /// Deliver both the first and last message from a collection of waiting messages + /// + FirstAndLast + } + [AttributeUsage(AttributeTargets.Class, Inherited = false, AllowMultiple = true)] public class DelayedAttribute : Attribute { - public DelayedAttribute(Type type, int count = -1, int delayMs = -1) + public DelayedAttribute(Type type, int count = -1, int delayMs = -1, DeliveryMode mode = DeliveryMode.Single) { this.Type = type; if(count != -1) this.Count = count; if(delayMs != -1) this.Delay = delayMs; + this.Mode = mode; if (Count > 200000) throw new ArgumentException($"{nameof(Count)} too large - maximum is 200000"); @@ -36,6 +57,7 @@ public DelayedAttribute(Type type, int count = -1, int delayMs = -1) public Type Type { get; private set; } public int? Count { get; private set; } public int? Delay { get; private set; } + public DeliveryMode? Mode { get; private set; } public Func KeyPropertyFunc { get; private set; } }