Skip to content

Commit

Permalink
add different delivery modes for delayed messages - fix #27
Browse files Browse the repository at this point in the history
  • Loading branch information
charlessolar committed Jan 3, 2018
1 parent 227abce commit bfca97d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,23 @@ private async Task InvokeDelayedChannel(IDelayedChannel channel, string channelK

using (var ctx = _metrics.Begin("Bulk Messages Time"))
{
foreach (var idx in Enumerable.Range(0, messages.Length))
await handler.Invoke(messages[idx].Message, context).ConfigureAwait(false);
switch (attr.Mode)
{
case DeliveryMode.Single:
foreach (var idx in Enumerable.Range(0, messages.Length))
await handler.Invoke(messages[idx].Message, context).ConfigureAwait(false);
break;
case DeliveryMode.First:
await handler.Invoke(messages[0].Message, context).ConfigureAwait(false);
break;
case DeliveryMode.Last:
await handler.Invoke(messages[messages.Length-1].Message, context).ConfigureAwait(false);
break;
case DeliveryMode.FirstAndLast:
await handler.Invoke(messages[0].Message, context).ConfigureAwait(false);
await handler.Invoke(messages[messages.Length - 1].Message, context).ConfigureAwait(false);
break;
}

if(ctx.Elapsed > TimeSpan.FromSeconds(5))
SlowLogger.InfoEvent("Invoked", "{Count} messages channel [{Channel:l}] key [{Key:l}] took {Milliseconds} ms", count, channelKey, specificKey, ctx.Elapsed.TotalMilliseconds);
Expand Down
24 changes: 23 additions & 1 deletion src/Aggregates.NET/Attributes/DelayedAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,37 @@

namespace Aggregates.Attributes
{
public enum DeliveryMode
{
/// <summary>
/// Deliver each message as a single call to message handler
/// </summary>
Single,
/// <summary>
/// Deliver only the first message from a collection of waiting messages
/// </summary>
First,
/// <summary>
/// Deliver only the last message from a collection of waiting messages
/// </summary>
Last,
/// <summary>
/// Deliver both the first and last message from a collection of waiting messages
/// </summary>
FirstAndLast
}

[AttributeUsage(AttributeTargets.Class, Inherited = false, AllowMultiple = true)]
public class DelayedAttribute : Attribute
{
public DelayedAttribute(Type type, int count = -1, int delayMs = -1)
public DelayedAttribute(Type type, int count = -1, int delayMs = -1, DeliveryMode mode = DeliveryMode.Single)
{
this.Type = type;
if(count != -1)
this.Count = count;
if(delayMs != -1)
this.Delay = delayMs;
this.Mode = mode;

if (Count > 200000)
throw new ArgumentException($"{nameof(Count)} too large - maximum is 200000");
Expand All @@ -36,6 +57,7 @@ public DelayedAttribute(Type type, int count = -1, int delayMs = -1)
public Type Type { get; private set; }
public int? Count { get; private set; }
public int? Delay { get; private set; }
public DeliveryMode? Mode { get; private set; }
public Func<object, string> KeyPropertyFunc { get; private set; }
}

Expand Down

0 comments on commit bfca97d

Please sign in to comment.