Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low throughput when using Flow.throttle with low maxBurst #23475

Closed
mmilewski opened this issue Aug 6, 2017 · 20 comments
Closed

Low throughput when using Flow.throttle with low maxBurst #23475

mmilewski opened this issue Aug 6, 2017 · 20 comments

Comments

@mmilewski
Copy link

mmilewski commented Aug 6, 2017

When I run

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}

import scala.concurrent.Await
import scala.concurrent.duration._

object WhatsWithThrottling {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("QuickStart")
    implicit val materializer = ActorMaterializer()

    val startMillis = System.currentTimeMillis()

    val numRecords = 1000
    val future = Source.fromIterator(() => Iterator.continually(1))
      .throttle(numRecords, per = 1.second, maximumBurst = 0, ThrottleMode.shaping)
      .take(numRecords)
      .runForeach(x => x)
    
    Await.result(future, atMost = 1.hour)
    system.terminate()

    val endMillis = System.currentTimeMillis()
    val elapsedMillis = endMillis - startMillis
    println(s"Elapsed: $elapsedMillis ms")
  }

}

then I get "Elapsed: 30103 ms" which is ~33 QPS, not 1000
For numRecords=5000 I get "Elapsed: 150093 ms", so it linearly increased 5x.

When I run with a modification

      .throttle(numRecords, per = 1.second, maximumBurst = numRecords, ThrottleMode.shaping)

then I get "Elapsed: 115 ms"

When I run with a different modification

    val future = Source.fromIterator(() => Iterator.continually(1))
      .throttle(1000, per = 1.second, maximumBurst = 1000, ThrottleMode.shaping)
      .take(5000)
      .runForeach(x => x)

then I get "Elapsed: 4125 ms", which is how I use it now (but it does not feel intuitive to set up both params to the same value).

When I run with yet another modification, just in case 0 burst is a problem, (maximumBurst = 5)

    val future = Source.fromIterator(() => Iterator.continually(1))
      .throttle(1000, per = 1.second, maximumBurst = 5, ThrottleMode.shaping)
      .take(1000)
      .runForeach(x => x)

then I get "Elapsed: 5041 ms" (down from 30s before) but still below expectations.

I'm not sure if the behaviour with low values for maxBurst is intended. Examples I find in the internet use 1 or 10 elements per second, which is too low to observe the problem.
That's something that would be good to clarify in the documentation (or at least give real-world example if .throttle(500, per = 1.second, maximumBurst = 500, shaping) is the expected usage)

Currently documentation uses 1 QPS, so the burst does not matter that much
http://doc.akka.io/docs/akka/2.5.3/scala/stream/stream-quickstart.html#time-based-processing

.throttle(1, 1.second, 1, ThrottleMode.shaping)

we use the throttle combinator to slow down the stream to 1 element per second (the second 1 in the argument list is the maximum size of a burst that we want to allow—passing 1 means that the first element gets through immediately and the second then has to wait for one second and so on).

Based on that I thought .throttle(1000, 1.second, 0, ThrottleMode.shaping) would give me 1000 QPS with 0 elements that get through immediately.

According to the documentation of Flow.throttle https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala#L1784-L1793

Whenever stream wants to send an element, it takes as many tokens from the bucket as number of elements. If there isn't any, throttle waits until the bucket accumulates enough tokens.`

It seems that tokens are generated way too slow for small buckets

I used

  • Scala 2.12.3
  • Akka Streams 2.5.3

Thoughts?

@johanandren
Copy link
Member

To me the api docs are pretty clear on this:

Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). Tokens drops into the bucket at a given rate and can be spared for later use up to bucket capacity to allow some burstiness. Whenever stream wants to send an element, it takes as many tokens from the bucket as number of elements. If there isn't any, throttle waits until the bucket accumulates enough tokens. Bucket is full when stream just materialized and started.

I'm not sure such a detailed description belongs in the "quickstart" but if you think this could be made more clear in the API docs, or in fact should be explained in the quickstart, a PR is definitely welcome!

@johanandren johanandren added 0 - new Ticket is unclear on it's purpose or if it is valid or not t:docs t:stream labels Aug 9, 2017
@mmilewski
Copy link
Author

Right, that's how I figured out that burst is not above the baseline. I'm ok to add the note to the quickstart (and api doc) saying: "To achieve 1000 QPS you should use throttle(1000, 1.second, 1000, ThrottleMode.shaping)". Quick start would link to api docs

@ktoso what's your opinion?

@johanandren
Copy link
Member

The sample isn't about achieving 1000qps though, so that sounds weird.

I'm thinking something more along the lines of replacing "(the second 1 in the argument list is the maximum size of a burst that ...)." with something that better describes the saving-up-to-a-burst behaviour.

Hard to describe it in a succinct way, maybe just end the parenthesis with a "read more in the API docs" linked to API docs?

@mmilewski
Copy link
Author

Why not?

As a user of the quickstart guide, what I'm looking for is how to throttle at X QPS*, not necessarily worrying about token bucket, spare tokens, ... If I want to know more how throttle works, I'll dig into the api doc.
The basic use case is QPS, I think - no data to back it up though. If this wasn't an issue in the past, maybe it's just me.

@patriknw
Copy link
Member

What would throttle(1000, 1.second, 100, ThrottleMode.shaping) look like?
I would expect 100 messages in each burst at an interval of 100 ms (1/10 s)

One could probably go lower, but below 20 ms interval per burst I don't think it will be very smooth.

Is that not how it's working?

@johanandren
Copy link
Member

I hadn't thought about what a throughput < burst scenario at all, I thought the useful thing is rather the opposite, for example throttle(100, 1.second, 1000, shaping) would, if there was no incoming elements for 10 seconds have saved tokens for a sudden burst of 1000 elements, but no more than that at any given time.

@patriknw
Copy link
Member

ok, I hadn't thought of that scenario :)
both might be valid
sounds like someone should try/test these and see what the outcome is and clarify documentation. Would you like to do that @mmilewski ?

@mmilewski
Copy link
Author

With

import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}
import scala.collection.JavaConverters._
import scala.collection.immutable.{SortedMap, SortedSet}
import scala.concurrent.Await
import scala.concurrent.duration._

object WhatsWithThrottling {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("QuickStart")
    implicit val materializer = ActorMaterializer()

    val startMillis = System.currentTimeMillis()

    val ticks = new ConcurrentHashMap[Long, Long]()
    val future = Source.fromIterator(() => Iterator.continually(1))
      .throttle(1000, per = 1.second, maximumBurst = 100, ThrottleMode.shaping)
      .take(1000)
      .runForeach(_ => {
        val bucket = (System.currentTimeMillis() - startMillis) / 10
        ticks.putIfAbsent(bucket, 0)
        ticks.computeIfPresent(bucket, (k, v) => v + 1)
      })

    Await.result(future, atMost = 1.hour)
    system.terminate()

    println(s"Elapsed: ${System.currentTimeMillis() - startMillis} ms, got ${ticks.asScala.to[SortedSet]}")
  }
}

I get

Elapsed: 1024 ms, got TreeSet((9,103), (12,26), (15,31), (18,25), (21,30), (24,32), (27,29), (30,29), (33,31), (36,30), (39,30), (42,29), (45,30), (48,31), (51,29), (54,32), (57,29), (60,29), (63,31), (66,29), (69,31), (72,29), (75,30), (78,31), (81,31), (84,29), (87,31), (90,29), (93,30), (96,30), (99,29), (102,5))

which means that every 30 ms there are ~30 elements going through the throttler

With

      .throttle(1000, per = 1.second, maximumBurst = 10, ThrottleMode.shaping)

I get

Elapsed: 2770 ms, got TreeSet((9,11), (13,12), (15,11), (18,12), (21,12), (24,11), (27,12), (30,11), (33,11), (36,11), (39,11), (42,11), (45,14), (48,11), (51,11), (54,11), (57,11), (60,11), (63,11), (66,11), (69,11), (72,11), (75,11), (78,11), (81,11), (84,11), (87,11), (90,12), (93,11), (96,11), (99,11), (102,11), (105,11), (108,11), (111,11), (114,11), (117,11), (120,11), (123,11), (126,11), (129,11), (132,11), (135,11), (138,11), (141,12), (144,11), (147,11), (150,11), (153,11), (156,11), (159,11), (162,11), (165,11), (168,12), (171,11), (174,11), (177,11), (180,11), (183,11), (186,11), (189,11), (192,11), (195,11), (198,11), (201,11), (204,11), (207,11), (210,11), (213,11), (216,11), (219,11), (222,11), (225,11), (228,11), (231,11), (234,11), (237,11), (240,11), (243,12), (246,11), (249,11), (252,11), (255,11), (258,11), (261,11), (264,11), (267,11), (270,11), (273,11), (276,10))

which means that every 30 ms there are ~11 elements going through the throttler.
Note that it took 2.7 seconds to complete meaning that I don't get 1000 tokens per second.

With

      .throttle(1000, per = 1.second, maximumBurst = 1, ThrottleMode.shaping)

I get (trimmed to the first 3 seconds to not pollute the output)

Elapsed: 3096 ms, got TreeSet((9,1), (12,2), (15,2), (18,2), (21,2), (24,2), (27,2), (30,2), (33,2), (36,2), (39,2), (42,2), (45,2), (48,2), (51,2), (54,2), (57,2), (60,2), (63,2), (66,2), (69,2), (72,2), (75,2), (78,2), (81,2), (84,2), (87,2), (90,2), (93,2), (96,2), (99,2), (102,2), (105,2), (108,2), (111,2), (114,2), (117,2), (120,2), (123,2), (126,2), (129,2), (132,2), (135,2), (138,2), (141,2), (144,2), (147,2), (150,2), (152,2), (156,2), (159,2), (162,2), (165,2), (168,3), (171,2), (174,2), (177,2), (180,2), (183,2), (186,2), (189,2), (192,2), (195,3), (197,2), (201,2), (204,2), (207,2), (210,2), (212,2), (216,2), (219,2), (222,2), (225,2), (228,2), (230,2), (234,2), (237,2), (240,2), (243,2), (246,2), (249,2), (252,2), (255,2), (258,2), (261,2), (264,2), (267,2), (270,2), (273,3), (276,2), (279,2), (282,2), (285,2), (287,2), (291,2), (294,2), (297,2), (300,2), (303,2), (306,2), (309,2))

which means that every 30 ms there are 2 elements going through the throttler.
Note that it took 3+ seconds to complete meaning that I don't get 1000 tokens per second.

So the bottom line is that to achieve 1000 QPS the maxBurst has to be at least 1000/30=34 where 30 is akka-internal value which I probably should not rely on (?).

@mmilewski
Copy link
Author

Here is a case when burst > rate

import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}
import scala.collection.JavaConverters._
import scala.collection.immutable.SortedSet
import scala.concurrent.Await
import scala.concurrent.duration._

object WhatsWithThrottling {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("QuickStart")
    implicit val materializer = ActorMaterializer()

    val startMillis = System.currentTimeMillis()

    val ticks = new ConcurrentHashMap[Long, Long]()
    val future = Source.fromIterator(() => Iterator.continually(1))
      .dropWithin(4.seconds)
      .throttle(1000, per = 1.second, maximumBurst = 3000, ThrottleMode.shaping)
      .take(10000)
      .runForeach(_ => {
        val bucket = (System.currentTimeMillis() - startMillis) / 100
        ticks.putIfAbsent(bucket, 0)
        ticks.computeIfPresent(bucket, (k, v) => v + 1)
      })

    Await.result(future, atMost = 20.seconds)
    system.terminate()
    println(s"Elapsed: ${System.currentTimeMillis() - startMillis} ms, got ${ticks.asScala.to[SortedSet]}")
  }
}

I get

Elapsed: 11083 ms, got TreeSet((40,3018), (41,84), (42,90), (43,120), (44,90), (45,118), (46,91), (47,89), (48,121), (49,90), (50,91), (51,89), (52,122), (53,89), (54,91), (55,117), (56,91), (57,91), (58,120), (59,89), (60,119), (61,92), (62,89), (63,119), (64,91), (65,89), (66,122), (67,89), (68,90), (69,89), (70,120), (71,92), (72,119), (73,91), (74,88), (75,120), (76,91), (77,91), (78,119), (79,91), (80,89), (81,120), (82,91), (83,90), (84,119), (85,91), (86,89), (87,120), (88,90), (89,92), (90,118), (91,91), (92,90), (93,118), (94,92), (95,88), (96,120), (97,92), (98,90), (99,88), (100,121), (101,90), (102,119), (103,92), (104,88), (105,121), (106,92), (107,89), (108,119), (109,91), (110,88))

which means that after 4 seconds I get burst of elements (3018) and then I get 90-120 elements every 100 ms, which gives roughtly 1000/s. Actually they come at ~30 elements every 30 ms but that's too much data to put here - you can change proper line to val bucket = ... / 10 to see that for yourself.

That's quite nice that elements are spread over time.

@agolubev
Copy link
Contributor

agolubev commented Aug 11, 2017

The reason for the confusion is bucket model that is implemented.

Throttle implements the token bucket model. There is a bucket with a given token capacity 
(burst size or maximumBurst). Tokens drop into the bucket at a given rate and can be
`spared` for later use up to bucket capacity to allow some burstiness.

So for burst = 0 there is no bucket and each time event is going through - system schedule timer for up to 1 millisecond. The problem here is that time span of timer between scheduling and up next push is 30 milliseconds.

So for 1000 scheduled timers, we suppose to wait 30 seconds.

The second drawback - is that bucket is filling with tokens only if time that passed is more than one token (time between to stream elements)

So I would reccomend to mark in a documentation that bucket size must cover schedule delay in 30 milliseconds.
Also will be good to count the time that is less than one token (timespan between two elements) to increase accuracy.

@mmilewski
Copy link
Author

mmilewski commented Aug 11, 2017 via email

@agolubev
Copy link
Contributor

Throttle is heavily depended on SLA - so it's dangerous to vary bucket size without developer's approval.
My guess is that we need another method in API that does not have bucket size at all to just provide some 30 milliseconds accuracy QPS

@agolubev
Copy link
Contributor

@patriknw @johanandren do you think would be good to have throttle without burst parameter (it looks like there should be something simple that can be used without understanding of bucket model approach)?
What's the general approach to timer delays? are we documenting them or trying to compensate in code?

@patriknw
Copy link
Member

I think the throttler and the bucket model should be described in documentation, preferably with some illustrations and typical use cases.

We could introduce another ThrottleMode like ShapingAuto if you think things can be adjusted automatically if it can't meet the requested frequency / burst. I guess one have to increase the burst size when delay is < 30 millis.

When you say SLA and QPS, do you mean that it should not exceed the given frequency and simply schedule with the given per interval, and at most emit the given number of elements within that interval. Not keeping any quota to next interval.
E.g. instead of saying 1000, per = 1.second, maximumBurst = 50, one would say 50, per = 50.millis.

@agolubev
Copy link
Contributor

agolubev commented Aug 15, 2017

I guess one have to increase the burst size when delay is < 30 millis.
Yes this is one way of doing it.

My guess is that developers believe that with setting: 1000 events in 1 seconds (and Burst is 0) they are expecting that there can be a burst up to 1000 events during each second.
I believe you are telling about the same.

When you say SLA and QPS, do you mean that it should not exceed the given frequency and simply schedule with the given per interval, and at most emit the given number of elements within that interval.

@agolubev
Copy link
Contributor

regarding ShapingAuto I think that as soon as we suppose to adjust burst size - burstSize should be removed from parameters.

@agolubev
Copy link
Contributor

To sum up we need to:

  • add example and good description of bucket throttle model to documentation
  • add throttle with ShapingAuto mode (without bucket as parameter?)
  • make existing throttle accumulates parts of token, not whole tokens ( currently if a token is 100ms and event come in 150ms throttle believes it's only one token (not one and a half)

did I miss anything?

@patriknw
Copy link
Member

sounds good, it would be great if you can work on that @agolubev
thanks

@patriknw patriknw added 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted and removed 0 - new Ticket is unclear on it's purpose or if it is valid or not labels Aug 20, 2017
@agolubev
Copy link
Contributor

@patriknw, I'll take a look

@agolubev
Copy link
Contributor

Ok, finally started this ticket.

@johanandren johanandren added 3 - in progress Someone is working on this ticket and removed 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted labels Oct 13, 2017
@patriknw patriknw removed the 3 - in progress Someone is working on this ticket label Nov 17, 2017
@patriknw patriknw added this to the 2.5.7 milestone Nov 17, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants