Skip to content

Commit

Permalink
Merge branch 'master' of github.com:jboner/akka
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonas Bonér committed Mar 30, 2011
2 parents 3b18943 + d476303 commit 8760c00
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 34 deletions.
Expand Up @@ -78,12 +78,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(

override private[akka] def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
if (mbox.dispatcherLock.locked && attemptDonationOf(invocation, mbox)) {
/*if (!mbox.isEmpty && attemptDonationOf(invocation, mbox)) {
//We were busy and we got to donate the message to some other lucky guy, we're done here
} else {
} else {*/
mbox enqueue invocation
registerForExecution(mbox)
}
//}
}

override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
Expand All @@ -110,13 +110,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
/**
* Returns true if the donation succeeded or false otherwise
*/
protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
/*protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
val actors = members // copy to prevent concurrent modifications having any impact
doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
case null => false
case recipient => donate(message, recipient)
}
}
}*/

/**
* Rewrites the message and adds that message to the recipients mailbox
Expand Down
20 changes: 11 additions & 9 deletions akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala
Expand Up @@ -14,6 +14,8 @@ import java.util.concurrent.TimeUnit

import akka.util.duration._

import akka.Testing

object FSMActorSpec {


Expand Down Expand Up @@ -100,30 +102,30 @@ class FSMActorSpec extends JUnitSuite {
def unlockTheLock = {

// lock that locked after being open for 1 sec
val lock = Actor.actorOf(new Lock("33221", (1, TimeUnit.SECONDS))).start
val lock = Actor.actorOf(new Lock("33221", (Testing.time(1), TimeUnit.SECONDS))).start

val transitionTester = Actor.actorOf(new Actor { def receive = {
case Transition(_, _, _) => transitionCallBackLatch.open
case CurrentState(_, Locked) => initialStateLatch.open
}}).start

lock ! SubscribeTransitionCallBack(transitionTester)
assert(initialStateLatch.tryAwait(1, TimeUnit.SECONDS))
assert(initialStateLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))

lock ! '3'
lock ! '3'
lock ! '2'
lock ! '2'
lock ! '1'

assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS))
assert(transitionLatch.tryAwait(1, TimeUnit.SECONDS))
assert(transitionCallBackLatch.tryAwait(1, TimeUnit.SECONDS))
assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS))
assert(unlockedLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
assert(transitionLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
assert(transitionCallBackLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
assert(lockedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))


lock ! "not_handled"
assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS))
assert(unhandledLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))

val answerLatch = new StandardLatch
object Hello
Expand All @@ -136,9 +138,9 @@ class FSMActorSpec extends JUnitSuite {
}
}).start
tester ! Hello
assert(answerLatch.tryAwait(2, TimeUnit.SECONDS))
assert(answerLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))

tester ! Bye
assert(terminatedLatch.tryAwait(2, TimeUnit.SECONDS))
assert(terminatedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
}
}
17 changes: 9 additions & 8 deletions akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala
Expand Up @@ -2,6 +2,7 @@ package akka.actor

import akka.testkit.TestKit
import akka.util.duration._
import akka.Testing

import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
Expand All @@ -16,7 +17,7 @@ class FSMTimingSpec

val fsm = Actor.actorOf(new StateMachine(testActor)).start
fsm ! SubscribeTransitionCallBack(testActor)
expectMsg(200 millis, CurrentState(fsm, Initial))
expectMsg(Testing.time(200).millis, CurrentState(fsm, Initial))

ignoreMsg {
case Transition(_, Initial, _) => true
Expand All @@ -25,15 +26,15 @@ class FSMTimingSpec
"A Finite State Machine" must {

"receive StateTimeout" in {
within (50 millis, 150 millis) {
within (Testing.time(50).millis, Testing.time(150).millis) {
fsm ! TestStateTimeout
expectMsg(Transition(fsm, TestStateTimeout, Initial))
expectNoMsg
}
}

"receive single-shot timer" in {
within (50 millis, 150 millis) {
within (Testing.time(50).millis, Testing.time(150).millis) {
fsm ! TestSingleTimer
expectMsg(Tick)
expectMsg(Transition(fsm, TestSingleTimer, Initial))
Expand All @@ -47,29 +48,29 @@ class FSMTimingSpec
case Tick => Tick
}
seq must have length (5)
within(250 millis) {
within(Testing.time(250) millis) {
expectMsg(Transition(fsm, TestRepeatedTimer, Initial))
expectNoMsg
}
}

"notify unhandled messages" in {
fsm ! TestUnhandled
within(100 millis) {
within(Testing.time(100) millis) {
fsm ! Tick
expectNoMsg
}
within(100 millis) {
within(Testing.time(100) millis) {
fsm ! SetHandler
fsm ! Tick
expectMsg(Unhandled(Tick))
expectNoMsg
}
within(100 millis) {
within(Testing.time(100) millis) {
fsm ! Unhandled("test")
expectNoMsg
}
within(100 millis) {
within(Testing.time(100) millis) {
fsm ! Cancel
expectMsg(Transition(fsm, TestUnhandled, Initial))
}
Expand Down
17 changes: 9 additions & 8 deletions akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala
Expand Up @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit}
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
import akka.util.{Duration, Switch}
import akka.Testing

object ActorModelSpec {

Expand Down Expand Up @@ -224,13 +225,13 @@ abstract class ActorModelSpec extends JUnitSuite {
a.start

a ! CountDown(start)
assertCountDown(start,3000, "Should process first message within 3 seconds")
assertCountDown(start, Testing.time(3000), "Should process first message within 3 seconds")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1)

a ! Wait(1000)
a ! CountDown(oneAtATime)
// in case of serialization violation, restart would happen instead of count down
assertCountDown(oneAtATime,1500,"Processed message when allowed")
assertCountDown(oneAtATime, Testing.time(1500) ,"Processed message when allowed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)

a.stop
Expand All @@ -245,7 +246,7 @@ abstract class ActorModelSpec extends JUnitSuite {

def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } }
for (i <- 1 to 10) { start }
assertCountDown(counter, 3000, "Should process 200 messages")
assertCountDown(counter, Testing.time(3000), "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)

a.stop
Expand All @@ -263,10 +264,10 @@ abstract class ActorModelSpec extends JUnitSuite {
val aStart,aStop,bParallel = new CountDownLatch(1)

a ! Meet(aStart,aStop)
assertCountDown(aStart,3000, "Should process first message within 3 seconds")
assertCountDown(aStart, Testing.time(3000), "Should process first message within 3 seconds")

b ! CountDown(bParallel)
assertCountDown(bParallel, 3000, "Should process other actors in parallel")
assertCountDown(bParallel, Testing.time(3000), "Should process other actors in parallel")

aStop.countDown()
a.stop
Expand All @@ -281,7 +282,7 @@ abstract class ActorModelSpec extends JUnitSuite {
val done = new CountDownLatch(1)
a ! Restart
a ! CountDown(done)
assertCountDown(done, 3000, "Should be suspended+resumed and done with next message within 3 seconds")
assertCountDown(done, Testing.time(3000), "Should be suspended+resumed and done with next message within 3 seconds")
a.stop
assertRefDefaultZero(a)(registers = 1,unregisters = 1, msgsReceived = 2,
msgsProcessed = 2, suspensions = 1, resumes = 1)
Expand All @@ -297,7 +298,7 @@ abstract class ActorModelSpec extends JUnitSuite {
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)

dispatcher.resume(a)
assertCountDown(done, 3000, "Should resume processing of messages when resumed")
assertCountDown(done, Testing.time(3000), "Should resume processing of messages when resumed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)

Expand All @@ -314,7 +315,7 @@ abstract class ActorModelSpec extends JUnitSuite {
(1 to num) foreach {
_ => newTestActor.start ! cachedMessage
}
assertCountDown(cachedMessage.latch,10000, "Should process " + num + " countdowns")
assertCountDown(cachedMessage.latch, Testing.time(10000), "Should process " + num + " countdowns")
}
for(run <- 1 to 3) {
flood(10000)
Expand Down
Expand Up @@ -12,6 +12,8 @@ import akka.config.Supervision._
import java.util.concurrent.CountDownLatch
import akka.config.TypedActorConfigurator

import akka.Testing

/**
* @author Martin Krasser
*/
Expand Down Expand Up @@ -95,7 +97,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}

it("should be stopped when supervision cannot handle the problem in") {
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(),30000)
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
val conf = new TypedActorConfigurator().configure(OneForOneStrategy(Nil, 3, 500000), Array(actorSupervision)).inject.supervise
try {
val first = conf.getInstance(classOf[TypedActorFailer])
Expand All @@ -121,7 +123,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}

it("should be restarted when supervision handles the problem in") {
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(),30000)
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
val conf = new TypedActorConfigurator().configure(OneForOneStrategy(classOf[Throwable] :: Nil, 3, 500000), Array(actorSupervision)).inject.supervise
try {
val first = conf.getInstance(classOf[TypedActorFailer])
Expand All @@ -146,4 +148,4 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
}
}
}
}
2 changes: 1 addition & 1 deletion project/build/AkkaProject.scala
Expand Up @@ -19,7 +19,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val scalaCompileSettings =
Seq("-deprecation",
"-Xmigration",
//"-optimise",
"-optimise",
"-encoding", "utf8")

val javaCompileSettings = Seq("-Xlint:unchecked")
Expand Down

0 comments on commit 8760c00

Please sign in to comment.