From 81f26ef7a81391b86fc6e48686977522c6dc7fae Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 20 Dec 2012 23:28:08 +0100 Subject: [PATCH] fix sometimes-off-by-one in PeekMailbox, see #2851 --- .../akka/contrib/mailbox/PeekMailbox.scala | 28 +++++++++---------- .../contrib/mailbox/PeekMailboxSpec.scala | 2 +- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala b/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala index bb580bccb90..73e687b179d 100644 --- a/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala +++ b/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala @@ -1,9 +1,7 @@ package akka.contrib.mailbox import java.util.concurrent.{ ConcurrentHashMap, ConcurrentLinkedQueue } - import com.typesafe.config.Config - import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import akka.dispatch.{ Envelope, MailboxType, MessageQueue, QueueBasedMessageQueue, UnboundedMessageQueueSemantics } @@ -40,15 +38,16 @@ class PeekMailboxExtension(val system: ExtendedActorSystem) extends Extension { class PeekMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType { override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match { case (Some(o), Some(s)) ⇒ - val tries = config.getInt("max-tries") ensuring (_ >= 1, "max-tries must be at least 1") - val mailbox = new PeekMailbox(o, s, tries) + val retries = config.getInt("max-retries") + if (retries < 1) throw new akka.ConfigurationException("max-retries must be at least 1") + val mailbox = new PeekMailbox(o, s, retries) PeekMailboxExtension(s).register(o, mailbox) mailbox case _ ⇒ throw new Exception("no mailbox owner or system given") } } -class PeekMailbox(owner: ActorRef, system: ActorSystem, maxTries: Int) +class PeekMailbox(owner: ActorRef, system: ActorSystem, maxRetries: Int) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope]() @@ -58,20 +57,21 @@ class PeekMailbox(owner: ActorRef, system: ActorSystem, maxTries: Int) * continue handing back out that same message until ACKed, peek() must be * used. The retry limit logic is then formulated in terms of the `tries` * field, which holds - * 0 if clean slate (i.e. last dequeue was ack()ed) - * 1..maxTries if not yet ack()ed - * Marker if last try was done (at which point we had to poll()) - * -1 during cleanUp (in order to disable the ack() requirement) + * 0 if clean slate (i.e. last dequeue was ack()ed) + * 1..maxRetries if not yet ack()ed + * Marker if last try was done (at which point we had to poll()) + * -1 during cleanUp (in order to disable the ack() requirement) */ // the mutable state is only ever accessed by the actor (i.e. dequeue() side) var tries = 0 - val Marker = maxTries + 1 + val Marker = maxRetries + 1 + // this logic does not work if maxRetries==0, but then you could also use a normal mailbox override def dequeue(): Envelope = tries match { - case -1 ⇒ queue.poll() - case 0 | Marker ⇒ tries = 1; queue.peek() - case `maxTries` ⇒ tries = Marker; queue.poll() - case n ⇒ tries = n + 1; queue.peek() + case -1 ⇒ queue.poll() + case 0 | Marker ⇒ val e = queue.peek(); tries = if (e eq null) 0 else 1; e + case `maxRetries` ⇒ tries = Marker; queue.poll() + case n ⇒ tries = n + 1; queue.peek() } def ack(): Unit = { diff --git a/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala b/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala index 1c27ce48287..4a2f68109d0 100644 --- a/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala @@ -32,7 +32,7 @@ object PeekMailboxSpec { class PeekMailboxSpec extends AkkaSpec(""" peek-dispatcher { mailbox-type = "akka.contrib.mailbox.PeekMailboxType" - max-tries = 3 + max-retries = 2 } """) with ImplicitSender {