diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index c439a073be9..3d4a6c439b1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -11,7 +11,7 @@ import akka.util.Switch import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList} import java.util.concurrent.atomic.AtomicReference -import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque} +import jsr166x.{Deque, LinkedBlockingDeque} /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -192,11 +192,16 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]" private[akka] def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { - case UnboundedMailbox(blocking) => // FIXME make use of 'blocking' in work stealer ConcurrentLinkedDeque - new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable { - def enqueue(handle: MessageInvocation): Unit = this.add(handle) + case UnboundedMailbox(blockDequeue) => + new LinkedBlockingDeque[MessageInvocation] with MessageQueue with Runnable { + final def enqueue(handle: MessageInvocation) { + this add handle + } - def dequeue: MessageInvocation = this.poll() + final def dequeue(): MessageInvocation = { + if (blockDequeue) this.take() + else this.poll() + } def run = if (!tryProcessMailbox(this)) { // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox @@ -204,11 +209,19 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) ) } } - case BoundedMailbox(blocking, capacity, pushTimeOut) => + case BoundedMailbox(blockDequeue, capacity, pushTimeOut) => new LinkedBlockingDeque[MessageInvocation](capacity) with MessageQueue with Runnable { - def enqueue(handle: MessageInvocation): Unit = this.add(handle) - def dequeue: MessageInvocation = this.poll() + final def enqueue(handle: MessageInvocation) { + if (pushTimeOut.toMillis > 0) { + if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit)) + throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) + } else this put handle + } + + final def dequeue(): MessageInvocation = + if (blockDequeue) this.take() + else this.poll() def run = if (!tryProcessMailbox(this)) { // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox