/
Throttle.scala
121 lines (104 loc) · 4.6 KB
/
Throttle.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package org.http4s.server.middleware
import org.http4s.{Http, Response, Status}
import cats.data.Kleisli
import cats.effect.{Clock, Sync}
import cats.effect.concurrent.Ref
import scala.concurrent.duration.FiniteDuration
import cats.implicits._
import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.concurrent.duration._
/**
* Transform a service to reject any calls the go over a given rate.
*/
object Throttle {
sealed abstract class TokenAvailability extends Product with Serializable
case object TokenAvailable extends TokenAvailability
final case class TokenUnavailable(retryAfter: Option[FiniteDuration]) extends TokenAvailability
/**
* A token bucket for use with the [[Throttle]] middleware. Consumers can take tokens which will be refilled over time.
* Implementations are required to provide their own refill mechanism.
*
* Possible implementations include a remote TokenBucket service to coordinate between different application instances.
*/
trait TokenBucket[F[_]] {
def takeToken: F[TokenAvailability]
}
object TokenBucket {
/**
* Creates an in-memory [[TokenBucket]].
*
* @param capacity the number of tokens the bucket can hold and starts with.
* @param refillEvery the frequency with which to add another token if there is capacity spare.
* @return A task to create the [[TokenBucket]].
*/
def local[F[_]](capacity: Int, refillEvery: FiniteDuration)(
implicit F: Sync[F],
clock: Clock[F]): F[TokenBucket[F]] = {
def getTime = clock.monotonic(NANOSECONDS)
val bucket = getTime.flatMap(time => Ref[F].of((capacity.toDouble, time)))
bucket.map { counter =>
new TokenBucket[F] {
override def takeToken: F[TokenAvailability] = {
val attemptUpdate = counter.access.flatMap {
case ((previousTokens, previousTime), setter) =>
getTime.flatMap(currentTime => {
val timeDifference = currentTime - previousTime
val tokensToAdd = timeDifference.toDouble / refillEvery.toNanos.toDouble
val newTokenTotal = Math.min(previousTokens + tokensToAdd, capacity.toDouble)
val attemptSet: F[Option[TokenAvailability]] = if (newTokenTotal >= 1) {
setter((newTokenTotal - 1, currentTime))
.map(_.guard[Option].as(TokenAvailable))
} else {
val timeToNextToken = refillEvery.toNanos - timeDifference
val successResponse = TokenUnavailable(timeToNextToken.nanos.some)
setter((newTokenTotal, currentTime)).map(_.guard[Option].as(successResponse))
}
attemptSet
})
}
def loop: F[TokenAvailability] = attemptUpdate.flatMap { attempt =>
attempt.fold(loop)(token => token.pure[F])
}
loop
}
}
}
}
}
/**
* Limits the supplied service to a given rate of calls using an in-memory [[TokenBucket]]
*
* @param amount the number of calls to the service to permit within the given time period.
* @param per the time period over which a given number of calls is permitted.
* @param http the service to transform.
* @return a task containing the transformed service.
*/
def apply[F[_], G[_]](amount: Int, per: FiniteDuration)(
http: Http[F, G])(implicit F: Sync[F], timer: Clock[F]): F[Http[F, G]] = {
val refillFrequency = per / amount.toLong
val createBucket = TokenBucket.local(amount, refillFrequency)
createBucket.map(bucket => apply(bucket, defaultResponse[G] _)(http))
}
def defaultResponse[F[_]](retryAfter: Option[FiniteDuration]): Response[F] = {
val _ = retryAfter
Response[F](Status.TooManyRequests)
}
/**
* Limits the supplied service using a provided [[TokenBucket]]
*
* @param bucket a [[TokenBucket]] to use to track the rate of incoming requests.
* @param throttleResponse a function that defines the response when throttled, may be supplied a suggested retry time depending on bucket implementation.
* @param http the service to transform.
* @return a task containing the transformed service.
*/
def apply[F[_], G[_]](
bucket: TokenBucket[F],
throttleResponse: Option[FiniteDuration] => Response[G] = defaultResponse[G] _)(
http: Http[F, G])(implicit F: Sync[F]): Http[F, G] =
Kleisli { req =>
bucket.takeToken.flatMap {
case TokenAvailable => http(req)
case TokenUnavailable(retryAfter) => throttleResponse(retryAfter).pure[F]
}
}
}