Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of Throttle middleware #2042

Merged
merged 2 commits into from Sep 11, 2018
Merged

Conversation

@keirlawson
Copy link
Contributor

@keirlawson keirlawson commented Sep 2, 2018

Feedback appreciated!

There is definitely room for more features, in a subsequent PR I'd like to make taking tokens a function of the Request so that throttling can be based on request sizes or IPs rather than just simply the number of requests. I'd also like to enable customisation of the response to throttled requests. If these are considered essential then I can add them to this PR however my preference would be to get this merged before iterating further.

import cats.implicits._
import fs2.Stream

sealed trait TokenAvailability
Copy link
Member

@rossabaker rossabaker Sep 3, 2018

sealed abstract class TokenAvailability extends Product with Serializable: noisier, but better inference an bin compat.

This is dumping a bunch of symbols into the middleware package. I wonder whether this ADT, and the TokenBucket, should be moved inside Throttle. Or maybe, in the opposite direction, it could be useful on a Client, and we should move it into org.http4s.util. 🤔

Copy link
Member

@SystemFw SystemFw Sep 3, 2018

Slight preference for having it into Throttle and not util for the time being.

Copy link
Member

@rossabaker rossabaker Sep 3, 2018

But move Throttle to another package or leave it here? I agree with moving the ADT into Throttle regardless.

org.http4s.utill is a cursed place of "things that make us wish that Scala were more amenable to microlibraries," so I do acknowledge the disgust.

Copy link
Member

@SystemFw SystemFw Sep 3, 2018

I won't oppose moving into util once we have the need. For now I'd rather keep it here so that the code is easier to follow.

On a related not wrt reusability, client-side limiting imho is not a great fit for a token bucket. upperbound (which focuses on that) has a different design for that reason

implicit F: Concurrent[F],
timer: Timer[F]): Stream[F, TokenBucket[F]] =
Stream.eval(Ref[F].of(capacity)).flatMap { counter =>
val refill = Stream
Copy link
Member

@rossabaker rossabaker Sep 3, 2018

I've implemented this previously by storing the last time a token was requested. Then you can interpolate out how much has been refilled from last time before deciding whether or not to grant a token, and not need a background process. This approach results in fractional tokens.

I think we'd also then get away with an F[TokenBucket[F]] instead of a Stream[F, TokenBucket[F]].

Copy link
Member

@SystemFw SystemFw Sep 3, 2018

I think we'd also then get away with an F[TokenBucket[F]] instead of a Stream[F, TokenBucket[F]].

we can do this regardless btw. Returning a Resource[TokenBucket] by compile.drain.start and cancel as the shutdown action.

You mentioned the refill approach a couple times but never with enough detail to understand how you did it, so this is interesting :)

Copy link
Member

@SystemFw SystemFw Sep 3, 2018

Just out of curiosity, it wouldn't work with a TokenBucket that had the task waiting rather than rejected, right? (if more recent requests don't arrive, old tasks are stuck)

Copy link
Member

@rossabaker rossabaker Sep 3, 2018

I think my rejection message included a "retry-after" like value. I don't think I maintained a queue. In either case, we can compute the time until full and try again. It does get messy if we're returning the same retry-after to multiple requestors and get slammed.

* @param http the service to transform.
* @return a singleton stream containing the transformed service.
*/
def apply[F[_], G[_]](bucket: TokenBucket[F])(http: Http[F, G])(
Copy link
Member

@rossabaker rossabaker Sep 3, 2018

Should we make the 429 response customizable?

Copy link
Member

@rossabaker rossabaker Sep 3, 2018

Now that I mention Retry-After above, hey, how about a Retry-After? 😄

Copy link
Contributor Author

@keirlawson keirlawson Sep 4, 2018

As I said in the PR message I want to implement this, I was just planning to do so in a follow-up PR, but if the feeling is that it is essential I can do it in this one.

Copy link
Member

@SystemFw SystemFw Sep 4, 2018

I think customisable responses are really easy to add here, and perhaps the rest can go in another PR?

@keirlawson
Copy link
Contributor Author

@keirlawson keirlawson commented Sep 8, 2018

Rebased on latest master, swapped to @rossabaker's solution for refilling bucket, added response customisation and retry-after set to when the bucket ought to be full.

I'm not totally convinced with the retry-after value, you could argue it should be when the next token will be available rather than when the bucket should be full, though in the real world if clients pay attention to that things will likely blow up, seems to me neither the next-token-time nor the bucket-full-time really make sense and it would be best to leave it up to the user as I think any value is either arbitrary or app-specific.

val newTokenTotal = Math.min(previousTokens + tokensToAdd, capacity.toDouble)

val attemptSet: F[Option[TokenAvailability]] = if (newTokenTotal >= 1) {
setter((newTokenTotal - 1, currentTime)).map(success => if (success) Some(TokenAvailable()) else None)
Copy link
Member

@SystemFw SystemFw Sep 8, 2018

map(success => if (success) Some(TokenAvailable()) else None)

You can write this as map(_.guard[Option].as(TokenAvailable())) if you wish

} else {
val timeToFull = (refillEvery.toNanos * capacity) - timeDifference
val successResponse = TokenUnavailable(Some(FiniteDuration(timeToFull, NANOSECONDS)))
setter((newTokenTotal, currentTime)).map(success => if (success) Some(successResponse) else None)
Copy link
Member

@SystemFw SystemFw Sep 8, 2018

TokenUnavailable(Some(FiniteDuration(timeToFull, NANOSECONDS)))

apologies for the bikeshedding (feel free to ignore)

TokenUnavailable(timeToFull.nanos.some)

Copy link
Contributor Author

@keirlawson keirlawson Sep 8, 2018

Bikeshedding appreciated, I had the feeling there would be a more concise way to do this...


sealed abstract class TokenAvailability extends Product with Serializable
case class TokenAvailable() extends TokenAvailability
case class TokenUnavailable(retryAfter: Option[FiniteDuration]) extends TokenAvailability
Copy link
Member

@SystemFw SystemFw Sep 8, 2018

Is this Option ever None?

Copy link
Contributor Author

@keirlawson keirlawson Sep 8, 2018

It could be with other TokenBucket impls, ie I envisage an "outstanding requests" version where you issue tokens when there are less than n requests currently being executed, in that scenario there is no obvious time-to-full

Copy link
Member

@rossabaker rossabaker left a comment

Thanks. This is looking real close.


object TokenBucket {
sealed abstract class TokenAvailability extends Product with Serializable
case class TokenAvailable() extends TokenAvailability
Copy link
Member

@rossabaker rossabaker Sep 9, 2018

Why did this cease to be a case object?

object TokenBucket {
sealed abstract class TokenAvailability extends Product with Serializable
case class TokenAvailable() extends TokenAvailability
case class TokenUnavailable(retryAfter: Option[FiniteDuration]) extends TokenAvailability
Copy link
Member

@rossabaker rossabaker Sep 9, 2018

Any reason not to make this a final case class?

*/
def apply[F[_], G[_]](amount: Int, per: FiniteDuration)(
http: Http[F, G])(implicit F: Concurrent[F], timer: Timer[F]): Stream[F, Http[F, G]] = {
http: Http[F, G])(implicit F: Sync[F], timer: Clock[F]): F[Http[F, G]] = {
Copy link
Member

@rossabaker rossabaker Sep 9, 2018

This is kind of confusing: the types are the same as TokenBucket.local, but we do extra math first.

I wonder whether this overload carries its weight at all, given that it doesn't allow customization of the throttled response. I imagine we could live without it, but I'd defer to you since you've probably tried to use this and I haven't.

Copy link
Contributor Author

@keirlawson keirlawson Sep 9, 2018

I'm not totally attached but my thinking was that some people will not care about the token bucket impl so easier to have a default where you don't need to construct your own, and the x requests per y time was a more "natural" way that people would think about their rate limit, ie "100 per second" rather than "1 per 0.6 seconds"

Copy link
Member

@SystemFw SystemFw Sep 9, 2018

I'm very slightly 👎 on the overload

Copy link
Member

@rossabaker rossabaker left a comment

I'm still mildly 👎 on the overload, but strongly 👍 on the the rest. I'll defer to a second reviewer on the overload and give it my blessing.

Thanks, this is going to be a welcome addition.

@keirlawson
Copy link
Contributor Author

@keirlawson keirlawson commented Sep 10, 2018

Per discussion in the Gitter channel, have changed behavior to not use the retry-after header by default, however time-to-next-token value is made available to the throttleRepose function should someone wish to use it, time-to-bucket-full should be derivable from this + refill rate.

case ((previousTokens, previousTime), setter) =>
getTime.flatMap(currentTime => {
val timeDifference = currentTime - previousTime
val tokensToAdd = timeDifference.toDouble / refillEvery.toNanos.toDouble
Copy link
Member

@jmcardon jmcardon Sep 11, 2018

Why are you doing a toDouble conversion just for the sake of using Math.min?

Copy link
Contributor Author

@keirlawson keirlawson Sep 11, 2018

tokensToAdd needs to allow for fractional tokens, if I remove both toDoubles from that line it will wind up a long

clock: Clock[F]): F[TokenBucket[F]] = {

def getTime = clock.monotonic(NANOSECONDS)
val bucket = getTime.flatMap(time => Ref[F].of((capacity.toDouble, time)))
Copy link
Member

@jmcardon jmcardon Sep 11, 2018

why is this a Double over a Long? You'll essentially be working with integral amounts most of the time.

Copy link
Contributor Author

@keirlawson keirlawson Sep 11, 2018

If we want to allow fractional tokens, which is the basis of the algorithm suggested by @rossabaker then I think we need a double? Unless I am missing something?

@SystemFw SystemFw merged commit 1c2e4c0 into http4s:master Sep 11, 2018
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

5 participants