Skip to content

Commit

Permalink
Merge pull request #975 from akka/wip-2.1-2851-PeekMailbox-∂π
Browse files Browse the repository at this point in the history
2.1: fix sometimes-off-by-one in PeekMailbox, see #2851
  • Loading branch information
rkuhn committed Dec 21, 2012
2 parents 281455e + 81f26ef commit 58bc74b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
28 changes: 14 additions & 14 deletions akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala
@@ -1,9 +1,7 @@
package akka.contrib.mailbox

import java.util.concurrent.{ ConcurrentHashMap, ConcurrentLinkedQueue }

import com.typesafe.config.Config

import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.dispatch.{ Envelope, MailboxType, MessageQueue, QueueBasedMessageQueue, UnboundedMessageQueueSemantics }

Expand Down Expand Up @@ -40,15 +38,16 @@ class PeekMailboxExtension(val system: ExtendedActorSystem) extends Extension {
class PeekMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match {
case (Some(o), Some(s))
val tries = config.getInt("max-tries") ensuring (_ >= 1, "max-tries must be at least 1")
val mailbox = new PeekMailbox(o, s, tries)
val retries = config.getInt("max-retries")
if (retries < 1) throw new akka.ConfigurationException("max-retries must be at least 1")
val mailbox = new PeekMailbox(o, s, retries)
PeekMailboxExtension(s).register(o, mailbox)
mailbox
case _ throw new Exception("no mailbox owner or system given")
}
}

class PeekMailbox(owner: ActorRef, system: ActorSystem, maxTries: Int)
class PeekMailbox(owner: ActorRef, system: ActorSystem, maxRetries: Int)
extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]()

Expand All @@ -58,20 +57,21 @@ class PeekMailbox(owner: ActorRef, system: ActorSystem, maxTries: Int)
* continue handing back out that same message until ACKed, peek() must be
* used. The retry limit logic is then formulated in terms of the `tries`
* field, which holds
* 0 if clean slate (i.e. last dequeue was ack()ed)
* 1..maxTries if not yet ack()ed
* Marker if last try was done (at which point we had to poll())
* -1 during cleanUp (in order to disable the ack() requirement)
* 0 if clean slate (i.e. last dequeue was ack()ed)
* 1..maxRetries if not yet ack()ed
* Marker if last try was done (at which point we had to poll())
* -1 during cleanUp (in order to disable the ack() requirement)
*/
// the mutable state is only ever accessed by the actor (i.e. dequeue() side)
var tries = 0
val Marker = maxTries + 1
val Marker = maxRetries + 1

// this logic does not work if maxRetries==0, but then you could also use a normal mailbox
override def dequeue(): Envelope = tries match {
case -1 queue.poll()
case 0 | Marker tries = 1; queue.peek()
case `maxTries` tries = Marker; queue.poll()
case n tries = n + 1; queue.peek()
case -1 queue.poll()
case 0 | Marker val e = queue.peek(); tries = if (e eq null) 0 else 1; e
case `maxRetries` tries = Marker; queue.poll()
case n tries = n + 1; queue.peek()
}

def ack(): Unit = {
Expand Down
Expand Up @@ -32,7 +32,7 @@ object PeekMailboxSpec {
class PeekMailboxSpec extends AkkaSpec("""
peek-dispatcher {
mailbox-type = "akka.contrib.mailbox.PeekMailboxType"
max-tries = 3
max-retries = 2
}
""") with ImplicitSender {

Expand Down

0 comments on commit 58bc74b

Please sign in to comment.