Skip to content

Commit

Permalink
Added an UnboundStablePriorityMailbox, sending messages according to …
Browse files Browse the repository at this point in the history
…priority. Messages with the same priority will be send using the same order as they appear. akkadotnet#2652 is the related issue.
  • Loading branch information
AndreSteenbergen committed Jul 5, 2018
1 parent d256162 commit 3ebec1f
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 8 deletions.
42 changes: 40 additions & 2 deletions src/core/Akka/Dispatch/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ public override IMessageQueue Create(IActorRef owner, ActorSystem system)
/// Priority mailbox base class; unbounded mailbox that allows for prioritization of its contents.
/// Extend this class and implement the <see cref="PriorityGenerator"/> method with your own prioritization.
/// The value returned by the <see cref="PriorityGenerator"/> method will be used to order the message in the mailbox.
/// Lower values will be delivered first. Messages ordered by the same number will remain in delivery order.
/// Lower values will be delivered first. Messages ordered by the same number will remain in delivered in undefined order.
/// </summary>
public abstract class UnboundedPriorityMailbox : MailboxType, IProducesMessageQueue<UnboundedPriorityMessageQueue>
{
Expand Down Expand Up @@ -716,7 +716,45 @@ protected UnboundedPriorityMailbox(Settings settings, Config config) : base(sett
}
}

//todo: bounded priority mailbox; stable priority mailboxes
//todo: bounded priority mailbox;

/// <summary>
/// Priority mailbox - an unbounded mailbox that allows for prioritization of its contents.
/// Extend this class and implement the <see cref="PriorityGenerator"/> method with your own prioritization.
/// The value returned by the <see cref="PriorityGenerator"/> method will be used to order the message in the mailbox.
/// Lower values will be delivered first. Messages ordered by the same number will remain in delivery order.
/// </summary>
public abstract class UnboundedStablePriorityMailbox : MailboxType, IProducesMessageQueue<UnboundedStablePriorityMessageQueue>
{
/// <summary>
/// Function responsible for generating the priority value of a message based on its type and content.
/// </summary>
/// <param name="message">The message to inspect.</param>
/// <returns>An integer. The lower the value, the higher the priority.</returns>
protected abstract int PriorityGenerator(object message);

/// <summary>
/// The initial capacity of the unbounded mailbox.
/// </summary>
public int InitialCapacity { get; }

/// <summary>
/// The default capacity of an unbounded priority mailbox.
/// </summary>
public const int DefaultCapacity = 11;

/// <inheritdoc cref="MailboxType"/>
public sealed override IMessageQueue Create(IActorRef owner, ActorSystem system)
{
return new UnboundedStablePriorityMessageQueue(PriorityGenerator, InitialCapacity);
}

/// <inheritdoc cref="MailboxType"/>
protected UnboundedStablePriorityMailbox(Settings settings, Config config) : base(settings, config)
{
InitialCapacity = DefaultCapacity;
}
}

/// <summary>
/// UnboundedDequeBasedMailbox is an unbounded <see cref="MailboxType"/> backed by a double-ended queue. Used for stashing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,4 @@ public void EnqueueFirst(Envelope envelope)
_prependBuffer.Push(envelope);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//-----------------------------------------------------------------------
// <copyright file="UnboundedPriorityMessageQueue.cs" company="Akka.NET Project">
// Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2018 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using Akka.Actor;
using Akka.Util;

namespace Akka.Dispatch.MessageQueues
{
/// <summary>
/// Base class for a message queue that uses a priority generator for messages
/// </summary>
public class UnboundedStablePriorityMessageQueue : BlockingMessageQueue, IUnboundedDequeBasedMessageQueueSemantics
{
private readonly StableListPriorityQueue _prioQueue;
// doesn't need to be threadsafe - only called from within actor
private readonly Stack<Envelope> _prependBuffer = new Stack<Envelope>();


/// <summary>
/// Creates a new unbounded priority message queue.
/// </summary>
/// <param name="priorityGenerator">The calculator function for determining the priority of inbound messages.</param>
/// <param name="initialCapacity">The initial capacity of the queue.</param>
public UnboundedStablePriorityMessageQueue(Func<object, int> priorityGenerator, int initialCapacity)
{
_prioQueue = new StableListPriorityQueue(initialCapacity, priorityGenerator);
}

/// <summary>
/// Unsafe method for computing the underlying message count.
/// </summary>
/// <remarks>
/// Called from within a synchronization mechanism.
/// </remarks>
protected override int LockedCount
{
get { return _prioQueue.Count(); }
}

/// <summary>
/// Unsafe method for enqueuing a new message to the queue.
/// </summary>
/// <param name="envelope">The message to enqueue.</param>
/// <remarks>
/// Called from within a synchronization mechanism.
/// </remarks>
protected override void LockedEnqueue(Envelope envelope)
{
_prioQueue.Enqueue(envelope);
}

/// <summary>
/// Unsafe method for attempting to dequeue a message.
/// </summary>
/// <param name="envelope">The message that might be dequeued.</param>
/// <returns><c>true</c> if a message was available to be dequeued, <c>false</c> otherwise.</returns>
/// <remarks>
/// Called from within a synchronization mechanism.
/// </remarks>
protected override bool LockedTryDequeue(out Envelope envelope)
{
if (_prependBuffer.Count > 0)
{
envelope = _prependBuffer.Pop();
return true;
}

if (_prioQueue.Count() > 0)
{
envelope = _prioQueue.Dequeue();
return true;
}
envelope = default(Envelope);
return false;
}

public void EnqueueFirst(Envelope envelope)
{
_prependBuffer.Push(envelope);
}
}
}

5 changes: 1 addition & 4 deletions src/core/Akka/Util/ListPriorityQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,4 @@ public bool IsConsistent()
return true; // passed all checks
} // IsConsistent
} // ListPriorityQueue


}

}
171 changes: 171 additions & 0 deletions src/core/Akka/Util/StableListPriorityQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
//-----------------------------------------------------------------------
// <copyright file="ListPriorityQueue.cs" company="Akka.NET Project">
// Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2018 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using Akka.Actor;

namespace Akka.Util
{
/// <summary>
/// Priority queue implemented using a simple list with binary search for inserts.
/// This specific implementation is cheap in terms of memory but weak in terms of performance.
/// See http://visualstudiomagazine.com/articles/2012/11/01/priority-queues-with-c.aspx for original implementation
/// This specific version is adapted for Envelopes only and calculates a priority of envelope.Message
/// </summary>
public sealed class StableListPriorityQueue
{
private struct WrappedEnvelope
{
public WrappedEnvelope(Envelope envelope, int sequenceNumber)
{
Envelope = envelope;
SequenceNumber = sequenceNumber;
}

public Envelope Envelope { get; }
public int SequenceNumber { get; }
}

private class WrappedEnvelopeComparator
{
private readonly Func<object, int> priorityCalculator;

public WrappedEnvelopeComparator(Func<object, int> priorityCalculator)
{
this.priorityCalculator = priorityCalculator;
}

public int Compare(WrappedEnvelope x, WrappedEnvelope y)
{
var baseCompare = priorityCalculator(x.Envelope.Message).CompareTo(priorityCalculator(y.Envelope.Message));
if (baseCompare != 0) return baseCompare;
return x.SequenceNumber.CompareTo(y.SequenceNumber);
}
}

private readonly List<WrappedEnvelope> _data;
private readonly WrappedEnvelopeComparator comparator;

/// <summary>
/// The default priority generator.
/// </summary>
internal static readonly Func<object, int> DefaultPriorityCalculator = message => 1;
private int sequenceNumber;

/// <summary>
/// Creates a new priority queue.
/// </summary>
/// <param name="initialCapacity">The initial capacity of the queue.</param>
/// <param name="priorityCalculator">The calculator function for assigning message priorities.</param>
public StableListPriorityQueue(int initialCapacity, Func<object, int> priorityCalculator)
{
_data = new List<WrappedEnvelope>(initialCapacity);
comparator = new WrappedEnvelopeComparator(priorityCalculator);
}

/// <summary>
/// Enqueues a message into the priority queue.
/// </summary>
/// <param name="item">The item to enqueue.</param>
public void Enqueue(Envelope item)
{
int seq = Interlocked.Increment(ref sequenceNumber);
var wrappedItem = new WrappedEnvelope(item, seq);

_data.Add(wrappedItem);
var ci = _data.Count - 1; // child index; start at end
while (ci > 0)
{
var pi = (ci - 1) / 2; // parent index
if (comparator.Compare(_data[ci], _data[pi]) >= 0) break; // child item is larger than (or equal) parent so we're done
var tmp = _data[ci]; _data[ci] = _data[pi]; _data[pi] = tmp;
ci = pi;
}
}

/// <summary>
/// Dequeues the highest priority message at the front of the priority queue.
/// </summary>
/// <returns>The highest priority message <see cref="Envelope"/>.</returns>
public Envelope Dequeue()
{
// assumes pq is not empty; up to calling code
var li = _data.Count - 1; // last index (before removal)
var frontItem = _data[0]; // fetch the front
_data[0] = _data[li];
_data.RemoveAt(li);

--li; // last index (after removal)
var pi = 0; // parent index. start at front of pq
while (true)
{
var ci = pi * 2 + 1; // left child index of parent
if (ci > li) break; // no children so done
var rc = ci + 1; // right child
if (rc <= li && comparator.Compare(_data[rc], _data[ci]) < 0) // if there is a rc (ci + 1), and it is smaller than left child, use the rc instead
ci = rc;
if (comparator.Compare(_data[pi], _data[ci]) <= 0) break; // parent is smaller than (or equal to) smallest child so done
var tmp = _data[pi]; _data[pi] = _data[ci]; _data[ci] = tmp; // swap parent and child
pi = ci;
}
return frontItem.Envelope;
}

/// <summary>
/// Peek at the message at the front of the priority queue.
/// </summary>
/// <returns>The highest priority message <see cref="Envelope"/>.</returns>
public Envelope Peek()
{
return _data[0].Envelope;
}

/// <summary>
/// Counts the number of items in the priority queue.
/// </summary>
/// <returns>The total number of items in the queue.</returns>
public int Count()
{
return _data.Count;
}

/// <summary>
/// Converts the queue to a string representation.
/// </summary>
/// <returns>A string representation of the queue.</returns>
public override string ToString()
{
var s = "";
for (var i = 0; i < _data.Count; ++i)
s += _data[i].ToString() + " ";
s += "count = " + _data.Count;
return s;
}

/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>
public bool IsConsistent()
{
// is the heap property true for all data?
if (_data.Count == 0) return true;
var li = _data.Count - 1; // last index
for (var pi = 0; pi < _data.Count; ++pi) // each parent index
{
var lci = 2 * pi + 1; // left child index
var rci = 2 * pi + 2; // right child index

if (lci <= li && comparator.Compare(_data[pi], _data[lci]) > 0) return false; // if lc exists and it's greater than parent then bad.
if (rci <= li && comparator.Compare(_data[pi], _data[rci]) > 0) return false; // check the right child too.
}
return true; // passed all checks
}
}
}

0 comments on commit 3ebec1f

Please sign in to comment.