Skip to content

Commit

Permalink
Fixing neglected configuration in WorkStealer
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorklang committed Feb 9, 2011
1 parent bd26086 commit 8ffad0e
Showing 1 changed file with 21 additions and 8 deletions.
Expand Up @@ -11,7 +11,7 @@ import akka.util.Switch
import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList}
import java.util.concurrent.atomic.AtomicReference

import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque}
import jsr166x.{Deque, LinkedBlockingDeque}

/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
Expand Down Expand Up @@ -192,23 +192,36 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"

private[akka] def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
case UnboundedMailbox(blocking) => // FIXME make use of 'blocking' in work stealer ConcurrentLinkedDeque
new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable {
def enqueue(handle: MessageInvocation): Unit = this.add(handle)
case UnboundedMailbox(blockDequeue) =>
new LinkedBlockingDeque[MessageInvocation] with MessageQueue with Runnable {
final def enqueue(handle: MessageInvocation) {
this add handle
}

def dequeue: MessageInvocation = this.poll()
final def dequeue(): MessageInvocation = {
if (blockDequeue) this.take()
else this.poll()
}

def run = if (!tryProcessMailbox(this)) {
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
// to another actor and then process his mailbox in stead.
findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) )
}
}
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
case BoundedMailbox(blockDequeue, capacity, pushTimeOut) =>
new LinkedBlockingDeque[MessageInvocation](capacity) with MessageQueue with Runnable {
def enqueue(handle: MessageInvocation): Unit = this.add(handle)

def dequeue: MessageInvocation = this.poll()
final def enqueue(handle: MessageInvocation) {
if (pushTimeOut.toMillis > 0) {
if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit))
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
} else this put handle
}

final def dequeue(): MessageInvocation =
if (blockDequeue) this.take()
else this.poll()

def run = if (!tryProcessMailbox(this)) {
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
Expand Down

0 comments on commit 8ffad0e

Please sign in to comment.