TimerBasedThrottler contribution module #835

Merged
merged 5 commits into from Nov 6, 2012

Projects

None yet

5 participants

@hbf
hbf commented Oct 31, 2012

I've finally added the timer-based throttler implementation.

  • Tests succeed.
  • Documentation (in akka-contrib/docs) updated and verified using sphinx:generate-html

Let me know if there are any problems with this!

Cheers,
Kaspar

Kaspar Fischer (hbf) contribution module throttle 0b2d3df
@akka-ci
Collaborator
akka-ci commented Oct 31, 2012

Started jenkins job akka-pr-validator at https://jenkins.akka.io/job/akka-pr-validator/45/

@akka-ci
Collaborator
akka-ci commented Oct 31, 2012

jenkins job akka-pr-validator: Success - https://jenkins.akka.io/job/akka-pr-validator/45/

@hbf hbf referenced this pull request Nov 1, 2012
Closed

Basic throttler implementation #627

@rkuhn
Collaborator
rkuhn commented Nov 2, 2012

I’ll take a look when I’m back from vacation next week.

@hbf
hbf commented Nov 2, 2012

Cool, I'll be available for follow-up changes. Enjoy your vacation!

@rkuhn rkuhn and 1 other commented on an outdated diff Nov 5, 2012
...scala/akka/contrib/throttle/TimerBasedThrottler.scala
+ * Set the rate of a [[akka.contrib.throttle.Throttler]].
+ *
+ * You may change a throttler's rate at any time.
+ *
+ * @param rate the rate at which messages will be delivered to the target of the throttler
+ */
+ case class SetRate(rate: Rate)
+
+ import language.implicitConversions
+
+ /**
+ * Helper for some syntactic sugar.
+ *
+ * @see [[akka.contrib.throttle.Throttler.Rate]]
+ */
+ implicit class RateInt(numberOfCalls: Int) {
@rkuhn
rkuhn Nov 5, 2012 Collaborator

would be nicer to make this extends AnyVal

@rkuhn rkuhn and 1 other commented on an outdated diff Nov 5, 2012
...a/akka/contrib/throttle/TimerBasedThrottlerSpec.scala
+import org.scalatest.matchers.MustMatchers
+import org.scalatest.BeforeAndAfterAll
+
+object TimerBasedThrottlerSpec {
+ class EchoActor extends Actor {
+ def receive = {
+ case x sender ! x
+ }
+ }
+}
+
+@RunWith(classOf[JUnitRunner])
+class TimerBasedThrottlerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
+ with WordSpec with MustMatchers with BeforeAndAfterAll {
+
+ def this() = this(ActorSystem("TimerBasedThrottlerSpec"))
@rkuhn
rkuhn Nov 5, 2012 Collaborator

why this dance? just extends TestKit(ActorSystem("TimerBasedThrottlerSpec"))

@hbf
hbf Nov 5, 2012

I've copied that from somewhere, but makes no sense here indeed.

@rkuhn rkuhn and 1 other commented on an outdated diff Nov 5, 2012
...a/akka/contrib/throttle/TimerBasedThrottlerSpec.scala
+ "respect the rate (3 msg/s)" in {
+ val echo = system.actorOf(Props[TimerBasedThrottlerSpec.EchoActor])
+ val throttler = system.actorOf(Props(new TimerBasedThrottler(3 msgsPer (1 second))))
+ throttler ! SetTarget(Some(echo))
+ throttler ! "1"
+ throttler ! "2"
+ throttler ! "3"
+ throttler ! "4"
+ throttler ! "5"
+ throttler ! "6"
+ throttler ! "7"
+ within(1 second) {
+ expectMsg("1")
+ expectMsg("2")
+ expectMsg("3")
+ expectNoMsg(remaining)
@rkuhn
rkuhn Nov 5, 2012 Collaborator

no need to say remaining here, that’s implied

@rkuhn
rkuhn Nov 5, 2012 Collaborator

same applies below

@hbf
hbf Nov 5, 2012

Hm, I think not: After sending 1-3, we want to wait a full second. Without the expectNoMsg(remaining), we'd expect 4-6 too early.

@rkuhn
rkuhn Nov 6, 2012 Collaborator

What I meant is that removing remaining does not change the behavior:

def expectNoMsg() { expectNoMsg_internal(remaining) }

(from TestKit.scala)

@hbf
hbf Nov 6, 2012

Ah, this way, thanks. – I removed the argument in the tests, see hbf@fb237b8

@rkuhn rkuhn commented on the diff Nov 5, 2012
...a/akka/contrib/throttle/TimerBasedThrottlerSpec.scala
+ expectMsg("4")
+ expectNoMsg(remaining)
+ }
+ within(1 second) {
+ expectMsg("5")
+ expectMsg("6")
+ expectMsg("7")
+ expectMsg("8")
+ expectNoMsg(remaining)
+ }
+ within(1 second) {
+ expectMsg("9")
+ }
+ }
+ }
+}
@rkuhn
rkuhn Nov 5, 2012 Collaborator

how about also testing SetTarget(None) with/without queued messages?

@hbf
hbf Nov 5, 2012

Always good idea – and indeed: found a bug. Two more tests are in now.

@rkuhn
Collaborator
rkuhn commented Nov 5, 2012

Great stuff! Even with picture ;-)

Kaspar Fisch... added some commits Nov 5, 2012
@hbf
hbf commented Nov 5, 2012

The new code (with the bug fix) is available here: hbf@4917680

@akka-ci
Collaborator
akka-ci commented Nov 5, 2012

Started jenkins job akka-pr-validator at https://jenkins.akka.io/job/akka-pr-validator/45/

@akka-ci
Collaborator
akka-ci commented Nov 5, 2012

jenkins job akka-pr-validator: Success - https://jenkins.akka.io/job/akka-pr-validator/45/

@rkuhn
Collaborator
rkuhn commented Nov 6, 2012

Akka guys, there’s just one very little comment outstanding from my side on this, could you please also review? Thanks.

@akka-ci
Collaborator
akka-ci commented Nov 6, 2012

Started jenkins job akka-pr-validator at https://jenkins.akka.io/job/akka-pr-validator/62/

@akka-ci
Collaborator
akka-ci commented Nov 6, 2012

jenkins job akka-pr-validator: Success - https://jenkins.akka.io/job/akka-pr-validator/62/

@patriknw patriknw and 2 others commented on an outdated diff Nov 6, 2012
...scala/akka/contrib/throttle/TimerBasedThrottler.scala
+ *
+ * == Processing messages ==
+ * The target should process messages as fast as possible. If the target requires substantial time to
+ * process messages, it should distribute its work to other actors (using for example something like
+ * a `BalancingDispatcher`), otherwise the resulting system will always work <em>below</em>
+ * the threshold rate.
+ *
+ * <em>Example:</em> Suppose the throttler has a rate of 3msg/s and the target requires 1s to process a message.
+ * This system will only process messages at a rate of 1msg/s: the target will receive messages at at most 3msg/s
+ * but as it handles them synchronously and each of them takes 1s, its inbox will grow and grow. In such
+ * a situation, the target should <em>distribute</em> its messages to a set of worker actors so that individual messages
+ * can be handled in parallel.
+ *
+ * @see [[akka.contrib.throttle.TimerBasedThrottler]]
+ */
+trait Throttler { self: Actor }
@patriknw
patriknw Nov 6, 2012 Member

I'm always avoiding self for the self type of actors, because Actor has val self: ActorRef.
It might not be a problem.

@rkuhn
rkuhn Nov 6, 2012 Collaborator

It’s not problematic here, since this self name is only visible within the Throttler trait, but I agree that it should be changed to this: Actor as a matter of style.

@patriknw patriknw commented on the diff Nov 6, 2012
...scala/akka/contrib/throttle/TimerBasedThrottler.scala
+ * Set the rate of a [[akka.contrib.throttle.Throttler]].
+ *
+ * You may change a throttler's rate at any time.
+ *
+ * @param rate the rate at which messages will be delivered to the target of the throttler
+ */
+ case class SetRate(rate: Rate)
+
+ import language.implicitConversions
+
+ /**
+ * Helper for some syntactic sugar.
+ *
+ * @see [[akka.contrib.throttle.Throttler.Rate]]
+ */
+ implicit class RateInt(val numberOfCalls: Int) extends AnyVal {
@patriknw patriknw commented on the diff Nov 6, 2012
...a/akka/contrib/throttle/TimerBasedThrottlerSpec.scala
+ expectMsg("3")
+ expectNoMsg()
+ }
+ expectNoMsg(1 second)
+ throttler ! SetTarget(Some(echo))
+ throttler ! "4"
+ throttler ! "5"
+ throttler ! "6"
+ throttler ! "7"
+ within(1 seconds) {
+ expectMsg("4")
+ expectMsg("5")
+ expectMsg("6")
+ expectNoMsg()
+ }
+ within(1 second) {
@patriknw
patriknw Nov 6, 2012 Member

I'm afraid this will not work on the slow jenkins machine with timefactor = 5. The within durations are dilated but the durations passed to the throttler is not. Shouldn't all durations be dilated. Perhaps it also needs to be taggedAs TimingTest.

@hbf
hbf Nov 6, 2012

Ah, I didn't know this.

I added dilation to the throttler rates and tested it by (locally and temporarily) setting timefactor = 5.0 in akka/akka-testkit/src/main/resources/reference.conf. With this, the tests still pass.

See hbf@aeb4e47

@patriknw
Member
patriknw commented Nov 6, 2012

Looks great! Consider the timing sensitivity in the tests.

@rkuhn rkuhn merged commit 15a46c0 into akka:master Nov 6, 2012
@rkuhn
Collaborator
rkuhn commented Nov 6, 2012

I merged this into master and am in the process of backporting to release-2.1, expect new PR shortly.

@hbf
hbf commented Nov 8, 2012

Hm, not being a Scala expert, I see in the test:

  override def afterAll {
    system.shutdown()
  }

Isn't there a = missing? Why does it work at all?

@viktorklang
Member

def x { } is called "procedure" style, it is shorthand for:
def x: Unit = {}

@hbf
hbf commented Nov 8, 2012

Appreciated, Viktor, thanks!

(I have added this to Stackoverflow, http://stackoverflow.com/questions/13300038, so hopefully your answer helps others, too.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment