-
-
Notifications
You must be signed in to change notification settings - Fork 7
/
ratelimit.go
95 lines (81 loc) · 2.11 KB
/
ratelimit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package ratelimit
import (
"context"
"errors"
"time"
"golang.org/x/time/rate"
"github.com/gotd/td/bin"
"github.com/gotd/td/clock"
"github.com/gotd/td/telegram"
"github.com/gotd/td/tg"
)
// RateLimiter is a tg.Invoker that throttles RPC calls on underlying invoker.
type RateLimiter struct {
clock clock.Clock
lim *rate.Limiter
}
// New returns a new invoker rate limiter using lim.
func New(r rate.Limit, b int) *RateLimiter {
return &RateLimiter{
clock: clock.System,
lim: rate.NewLimiter(r, b),
}
}
// clone returns a copy of the RateLimiter.
func (l *RateLimiter) clone() *RateLimiter {
return &RateLimiter{
clock: l.clock,
lim: l.lim,
}
}
// WithClock sets clock to use. Default is to use system clock.
func (l *RateLimiter) WithClock(c clock.Clock) *RateLimiter {
l = l.clone()
l.clock = c
return l
}
// wait blocks until rate limiter permits an event to happen. It returns an error if
// limiter’s burst size is misconfigured, the Context is canceled, or the expected
// wait time exceeds the Context’s Deadline.
func (l *RateLimiter) wait(ctx context.Context) error {
// Check if ctx is already canceled.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
now := l.clock.Now()
r := l.lim.ReserveN(now, 1)
if !r.OK() {
// Limiter requires n <= lim.burst for each reservation.
return errors.New("limiter's burst size must be greater than zero")
}
delay := r.DelayFrom(now)
if delay == 0 {
return nil
}
// Bail out earlier if we exceed context deadline. Note that
// contexts use system time instead of mockable clock.
deadline, ok := ctx.Deadline()
if ok && delay > time.Until(deadline) {
return context.DeadlineExceeded
}
t := l.clock.Timer(delay)
defer clock.StopTimer(t)
select {
case <-t.C():
return nil
case <-ctx.Done():
r.CancelAt(l.clock.Now())
return ctx.Err()
}
}
// Handle implements telegram.Middleware.
func (l *RateLimiter) Handle(next tg.Invoker) telegram.InvokeFunc {
return func(ctx context.Context, input bin.Encoder, output bin.Decoder) error {
if err := l.wait(ctx); err != nil {
return err
}
return next.Invoke(ctx, input, output)
}
}