Permalink
Browse files

first version of the work stealing idea. Added a dispatcher which con…

…siders all actors dispatched in that dispatcher part of the same pool of actors. Added a test to verify that a fast actor steals work from a slower actor.
  • Loading branch information...
1 parent d46504f commit 4ee9078c71b90f4abf4fd7f71d608d5e32bd888a @janvanbesien janvanbesien committed Mar 3, 2010
Showing with 24 additions and 11 deletions.
  1. +17 −11 akka-core/src/main/scala/actor/Actor.scala
  2. +7 −0 akka-core/src/main/scala/dispatch/Dispatchers.scala
@@ -18,9 +18,9 @@ import se.scalablesolutions.akka.util.{HashCode, Logging, UUID}
import org.multiverse.api.ThreadLocalTransaction._
-import java.util.{Queue, HashSet}
-import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.{Deque, HashSet}
import java.net.InetSocketAddress
+import java.util.concurrent.{CopyOnWriteArrayList, LinkedBlockingDeque}
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
@@ -212,12 +212,14 @@ trait Actor extends TransactionManagement {
@volatile private[this] var _isShutDown = false
@volatile private[this] var _isEventBased: Boolean = false
@volatile private[akka] var _isKilled = false
+ @volatile private[akka] var _isDispatching = false
+
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
private[akka] var _linkedActors: Option[HashSet[Actor]] = None
private[akka] var _supervisor: Option[Actor] = None
private[akka] var _replyToAddress: Option[InetSocketAddress] = None
- private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation]
+ private[akka] val _mailbox: Deque[MessageInvocation] = new LinkedBlockingDeque[MessageInvocation]
// ====================================
// protected fields
@@ -860,14 +862,18 @@ trait Actor extends TransactionManagement {
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
- private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
- try {
- if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
- else dispatch(messageHandle)
- } catch {
- case e =>
- Actor.log.error(e, "Could not invoke actor [%s]", this)
- throw e
+ private[akka] def invoke(messageHandle: MessageInvocation) = {
+ _isDispatching = true
+ synchronized {
+ try {
+ if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
+ else dispatch(messageHandle)
+ } catch {
+ case e =>
+ Actor.log.error(e, "Could not invoke actor [%s]", this)
+ throw e
+ }
+ _isDispatching = false
}
}
@@ -51,6 +51,13 @@ object Dispatchers {
def newExecutorBasedEventDrivenDispatcher(name: String) = new ExecutorBasedEventDrivenDispatcher(name)
/**
+ * Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool.
+ * <p/>
+ * Has a fluent builder interface for configuring its semantics.
+ */
+ def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) = new ExecutorBasedEventDrivenWorkStealingDispatcher(name)
+
+ /**
* Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.

0 comments on commit 4ee9078

Please sign in to comment.