Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler that manually advances time #24243

Merged
merged 11 commits into from
Jan 31, 2018
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.actor.typed;

//#manual-scheduling-simple
import java.util.concurrent.TimeUnit;
import static com.typesafe.config.ConfigFactory.parseString;

import scala.concurrent.duration.Duration;

import akka.actor.typed.javadsl.Behaviors;

import org.junit.Test;

import akka.testkit.typed.TestKit;
import akka.testkit.typed.ExplicitlyTriggeredScheduler;
import akka.testkit.typed.javadsl.TestProbe;

public class ManualTimerTest extends TestKit {
ExplicitlyTriggeredScheduler scheduler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps TriggeredScheduler is enough?


public ManualTimerTest() {
super(parseString("akka.scheduler.implementation = \"akka.testkit.typed.ExplicitlyTriggeredScheduler\""));
this.scheduler = (ExplicitlyTriggeredScheduler) system().scheduler();
}

static final class Tick {}
static final class Tock {}

@Test
public void testScheduleNonRepeatedTicks() {
TestProbe<Tock> probe = new TestProbe<>(system());
Behavior<Tick> behavior = Behaviors.withTimers(timer -> {
timer.startSingleTimer("T", new Tick(), Duration.create(10, TimeUnit.MILLISECONDS));
return Behaviors.immutable( (ctx, tick) -> {
probe.ref().tell(new Tock());
return Behaviors.same();
});
});

spawn(behavior);

scheduler.expectNoMessageFor(Duration.create(9, TimeUnit.MILLISECONDS), probe);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be more readable to put the expectNoMessageFor on the probe, with a ExplicitlyTriggeredScheduler parameter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid confusion with probe.expectNoMessage, which will not automatically advance the time of the explicit scheduler. If we decide we want to have separate probes for normal vs manual schedulers anyway (see below) then it should indeed of course move to the probe.


scheduler.timePasses(Duration.create(2, TimeUnit.MILLISECONDS));
probe.expectMsgType(Tock.class);

scheduler.expectNoMessageFor(Duration.create(10, TimeUnit.SECONDS), probe);
}


}
//#manual-scheduling-simple
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package akka.actor.typed

//#manual-scheduling-simple
import scala.concurrent.duration._

import akka.actor.typed.scaladsl.Behaviors

import org.scalatest.WordSpecLike

import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.{ ManualTime, TestProbe }

class ManualTimerSpec extends TestKit(ManualTime.config) with ManualTime with WordSpecLike {

"A timer" must {
"schedule non-repeated ticks" in {
case object Tick
case object Tock

val probe = TestProbe[Tock.type]()
val behavior = Behaviors.withTimers[Tick.type] { timer ⇒
timer.startSingleTimer("T", Tick, 10.millis)
Behaviors.immutable { (ctx, Tick) ⇒
probe.ref ! Tock
Behaviors.same
}
}

spawn(behavior)

scheduler.expectNoMessageFor(9.millis, probe)

scheduler.timePasses(2.millis)
probe.expectMsg(Tock)

scheduler.expectNoMessageFor(10.seconds, probe)
}
//#manual-scheduling-simple

"schedule repeated ticks" in {
case object Tick
case object Tock

val probe = TestProbe[Tock.type]()
val behavior = Behaviors.withTimers[Tick.type] { timer ⇒
timer.startPeriodicTimer("T", Tick, 10.millis)
Behaviors.immutable { (ctx, Tick) ⇒
probe.ref ! Tock
Behaviors.same
}
}

spawn(behavior)

for (_ ← Range(0, 5)) {
scheduler.expectNoMessageFor(9.millis, probe)

scheduler.timePasses(1.milli)
probe.expectMsg(Tock)
}
}

"replace timer" in {
sealed trait Command
case class Tick(n: Int) extends Command
case class SlowThenBump(nextCount: Int) extends Command
sealed trait Event
case class Tock(n: Int) extends Event

val probe = TestProbe[Event]("evt")
val interval = 10.millis

val behavior = Behaviors.withTimers[Command] { timer ⇒
timer.startPeriodicTimer("T", Tick(1), interval)
Behaviors.immutable { (ctx, cmd) ⇒
cmd match {
case Tick(n) ⇒
probe.ref ! Tock(n)
Behaviors.same
case SlowThenBump(nextCount) ⇒
scheduler.timePasses(interval)
timer.startPeriodicTimer("T", Tick(nextCount), interval)
Behaviors.same
}
}
}

val ref = spawn(behavior)

scheduler.timePasses(11.millis)
probe.expectMsg(Tock(1))

// next Tock(1) enqueued in mailboxed, but should be discarded because of new timer
ref ! SlowThenBump(2)
scheduler.expectNoMessageFor(interval, probe)

scheduler.timePasses(interval)
probe.expectMsg(Tock(2))
}

//#manual-scheduling-simple
}
}
//#manual-scheduling-simple
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,15 @@ object AskPattern {
* val f: Future[Reply] = target ? (Request("hello", _))
* }}}
*/
def ?[U](f: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] =
def ?[U](f: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = {
// We do not currently use the implicit scheduler, but want to require it
// because it might be needed when we move to a 'native' typed runtime, see #24219
ref match {
case a: adapt.ActorRefAdapter[_] ⇒ askUntyped(ref, a.untyped, timeout, f)
case a: adapt.ActorSystemAdapter[_] ⇒ askUntyped(ref, a.untyped.guardian, timeout, f)
case a ⇒ throw new IllegalStateException("Only expect actor references to be ActorRefAdapter or ActorSystemAdapter until native system is implemented: " + a.getClass)
}
}
}

private val onTimeout: String ⇒ Throwable = msg ⇒ new TimeoutException(msg)
Expand Down
13 changes: 13 additions & 0 deletions akka-docs/src/main/paradox/typed/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,16 @@ Scala
Java
: @@snip [BasicAsyncTestingTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java) { #test-spawn-anonymous }

### Controlling the scheduler

It can be hard to reliably unit test specific scenario's when your actor relies on timing:
especially when running many tests in parallel it can be hard to get the timing just right.
Making such tests more reliable by using generous timeouts make the tests take a long time to run.

For such situations, we provide a scheduler where you can manually, explicitly advance the clock.

Scala
: @@snip [ManualTimerSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala) { #manual-scheduling-simple }

Java
: @@snip [ManualTimerTest.scala]($akka$/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java) { #manual-scheduling-simple }
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package akka.testkit.typed

import java.util.concurrent.ThreadFactory

import com.typesafe.config.Config

import scala.annotation.varargs
import scala.concurrent.duration.{ Duration, FiniteDuration }

import akka.event.LoggingAdapter

class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends akka.testkit.ExplicitlyTriggeredScheduler(config, log, tf) {

@varargs
def expectNoMessageFor(duration: FiniteDuration, on: scaladsl.TestProbe[_]*): Unit = {
timePasses(duration)
on.foreach(_.expectNoMessage(Duration.Zero))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class TestKit(name: String, config: Option[Config]) extends TestKitBase {
def this(name: String) = this(name, None)
def this(config: Config) = this(TestKit.getCallerName(classOf[TestKit]), Some(config))
def this(name: String, config: Config) = this(name, Some(config))

import TestKit._
implicit val system = ActorSystem(testKitGuardian, name, config = config)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package akka.testkit.typed.scaladsl

import com.typesafe.config.{ Config, ConfigFactory }

import akka.testkit.typed._

object ManualTime {
val config: Config = ConfigFactory.parseString("""akka.scheduler.implementation = "akka.testkit.typed.ExplicitlyTriggeredScheduler"""")
}
trait ManualTime { self: TestKit ⇒
override val scheduler: ExplicitlyTriggeredScheduler = self.system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package akka.testkit

import java.util.concurrent.ThreadFactory
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import com.typesafe.config.Config

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.util.Try

import akka.actor.Cancellable
import akka.actor.Scheduler
import akka.event.LoggingAdapter

/**
* For testing: scheduler that does not look at the clock, but must be
* progressed manually by calling `timePasses`.
*
* This allows for faster and less timing-sensitive specs, as jobs will be
* executed on the test thread instead of using the original
* {ExecutionContext}. This means recreating specific scenario's becomes
* easier, but these tests might fail to catch race conditions that only
* happen when tasks are scheduled in parallel in 'real time'.
*/
class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends Scheduler {

private case class Item(time: Long, interval: Option[FiniteDuration], runnable: Runnable)

private val currentTime = new AtomicLong()
private val scheduled = new ConcurrentHashMap[Item, Unit]()

override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
schedule(initialDelay, Some(interval), runnable)

override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
schedule(delay, None, runnable)

/**
* Advance the clock by the specified duration, executing all outstanding jobs on the calling thread before returning.
*
* We will not add a dilation factor to this amount, since the scheduler API also does not apply dilation.
* If you want the amount of time passed to be dilated, apply the dilation before passing the delay to
* this method.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"This will execute all outstanding jobs on the calling thread before returning" or such?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍. (It will also execute any jobs that are scheduled to be executed within the specified period by those outstanding jobs, but that might be a subtlety that doesn't really belong in the scaladoc here)

def timePasses(amount: FiniteDuration) = {
// Give dispatchers time to clear :(. See
// https://github.com/akka/akka/pull/24243#discussion_r160985493
// for some discussion on how to deal with this properly.
Thread.sleep(100)

val newTime = currentTime.get + amount.toMillis
if (log.isDebugEnabled)
log.debug(s"Time proceeds from ${currentTime.get} to $newTime, currently scheduled for this period:" + scheduledTasks(newTime).map(item ⇒ s"\n- $item"))

executeTasks(newTime)
currentTime.set(newTime)
}

private def scheduledTasks(runTo: Long): Seq[Item] =
scheduled
.keySet()
.asScala
.filter(_.time <= runTo)
.toList
.sortBy(_.time)

@tailrec
private[testkit] final def executeTasks(runTo: Long): Unit = {
scheduledTasks(runTo).headOption match {
case Some(task) ⇒
currentTime.set(task.time)
val runResult = Try(task.runnable.run())
scheduled.remove(task)

if (runResult.isSuccess)
task.interval.foreach(i ⇒ scheduled.put(task.copy(time = task.time + i.toMillis), ()))

// running the runnable might have scheduled new events
executeTasks(runTo)
case _ ⇒ // Done
}
}

private def schedule(initialDelay: FiniteDuration, interval: Option[FiniteDuration], runnable: Runnable): Cancellable = {
val firstTime = currentTime.get + initialDelay.toMillis
val item = Item(firstTime, interval, runnable)
log.debug("Scheduled item for {}: {}", firstTime, item)
scheduled.put(item, ())

if (initialDelay <= Duration.Zero)
executeTasks(currentTime.get)

new Cancellable {
var cancelled = false

override def cancel(): Boolean = {
val before = scheduled.size
scheduled.remove(item)
cancelled = true
before > scheduled.size
}

override def isCancelled: Boolean = cancelled
}
}

override def maxFrequency: Double = 42
}