/
rate_limiter.go
222 lines (201 loc) · 7.99 KB
/
rate_limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package ratelimit
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/0xProject/0x-mesh/meshdb"
"github.com/benbjohnson/clock"
log "github.com/sirupsen/logrus"
"github.com/syndtr/goleveldb/leveldb"
"golang.org/x/time/rate"
)
const (
// maxRequestsPer24HrsBuffer is the buffer subtracted from the operator supplied
// maxRequestsPer24Hrs. This buffer helps ensure that we don't overstep the desired
// max number of requests.
maxRequestsPer24HrsBuffer = 1000
lowestPossibleMaxRequestsPer24Hrs = 40000
)
// RateLimiter is the interface one must satisfy to be considered a RateLimiter
type RateLimiter interface {
Wait(ctx context.Context) error
Start(ctx context.Context, checkpointInterval time.Duration) error
getCurrentUTCCheckpoint() time.Time
getGrantedInLast24hrsUTC() int
}
// rateLimiter is a rate-limiter for requests
type rateLimiter struct {
maxRequestsPer24Hrs int
twentyFourHourLimiter *rate.Limiter
perSecondLimiter *rate.Limiter
currentUTCCheckpoint time.Time // Start of current UTC 24hr period
grantedInLast24hrsUTC int // Number of granted requests issued in last 24hr UTC
meshDB *meshdb.MeshDB
aClock clock.Clock
wasStartedOnce bool // Whether the rate limiter has previously been started
startMutex sync.Mutex // Mutex around the start check
mu sync.Mutex
}
// New instantiates a new RateLimiter
func New(maxRequestsPer24HrsWithoutBuffer int, maxRequestsPerSecond float64, meshDB *meshdb.MeshDB, aClock clock.Clock) (RateLimiter, error) {
if maxRequestsPer24HrsWithoutBuffer < lowestPossibleMaxRequestsPer24Hrs {
return nil, fmt.Errorf("EthereumRPCMaxRequestsPer24HrUTC too low. Should be at least %d", lowestPossibleMaxRequestsPer24Hrs)
}
// Reduce the requested maxRequestsPer24Hrs by maxRequestsPer24HrsBuffer out of extra precaution
maxRequestsPer24Hrs := maxRequestsPer24HrsWithoutBuffer - maxRequestsPer24HrsBuffer
metadata, err := meshDB.GetMetadata()
if err != nil {
return nil, err
}
// Check if stored checkpoint in DB is still relevant
now := aClock.Now()
currentUTCCheckpoint := getUTCMidnightOfDate(now)
storedUTCCheckpoint := metadata.StartOfCurrentUTCDay
storedGrantedInLast24HrsUTC := metadata.EthRPCRequestsSentInCurrentUTCDay
// Update DB if current values are from previous 24hr period and therefore no longer relevant
if currentUTCCheckpoint != storedUTCCheckpoint {
storedUTCCheckpoint = currentUTCCheckpoint
storedGrantedInLast24HrsUTC = 0
if err := meshDB.UpdateMetadata(func(metadata meshdb.Metadata) meshdb.Metadata {
metadata.StartOfCurrentUTCDay = storedUTCCheckpoint
metadata.EthRPCRequestsSentInCurrentUTCDay = storedGrantedInLast24HrsUTC
return metadata
}); err != nil {
return nil, err
}
}
// Compute the number of grants accrued since 12am UTC that have not been used. We will than
// instantiate the rate limiter to start with the accrued grants remaining available for
// immediate use
timePassedSinceCheckpoint := aClock.Since(currentUTCCheckpoint)
// Translate time passed into theoretical # grants accrued
// (timePassed / 24hrs) * maxRequestsPer24hrs
theoreticalGrantsAccrued := int((float64(timePassedSinceCheckpoint.Nanoseconds()) / float64((24 * time.Hour).Nanoseconds())) * float64(maxRequestsPer24Hrs))
// theoreticalGrants - storedGrantedInLast24HrsUTC = accruedGrants
accruedGrants := theoreticalGrantsAccrued - storedGrantedInLast24HrsUTC
// Instantiate limiter with `maxRequestsPer24Hrs` bucketsize and a limit
// that results in `maxRequestsPer24Hrs` requests being whitelisted in a 24hr period
limit := rate.Limit(float64(maxRequestsPer24Hrs) / (24 * 60 * 60))
twentyFourHourLimiter := rate.NewLimiter(limit, maxRequestsPer24Hrs)
// Since Limiter begins initially full, we drain it before use. i.e., We do not want 100k
// requests to already be queued up, instead we only want the number of accrued grants that
// have gone unused to be available at startup
amountToDrain := maxRequestsPer24Hrs - accruedGrants
ctx := context.Background()
err = twentyFourHourLimiter.WaitN(ctx, amountToDrain)
if err != nil {
return nil, err
}
// Instantiate limiter with a bucketsize of one and a limit that results
// in no more than `maxRequestsPerSecond` requests per second.
limit = rate.Limit(maxRequestsPerSecond)
perSecondLimiter := rate.NewLimiter(limit, 1)
return &rateLimiter{
aClock: aClock,
maxRequestsPer24Hrs: maxRequestsPer24Hrs,
twentyFourHourLimiter: twentyFourHourLimiter,
perSecondLimiter: perSecondLimiter,
meshDB: meshDB,
currentUTCCheckpoint: storedUTCCheckpoint,
grantedInLast24hrsUTC: storedGrantedInLast24HrsUTC,
}, nil
}
// Start starts two background processes required for the RateLimiter to function. One that
// stores it's state to the DB at a checkpoint interval, and another that clears accrued
// grants when the UTC day time window elapses.
func (r *rateLimiter) Start(ctx context.Context, checkpointInterval time.Duration) error {
r.startMutex.Lock()
if r.wasStartedOnce {
r.startMutex.Unlock()
return errors.New("Can only start RateLimiter once per instance")
}
r.wasStartedOnce = true
r.startMutex.Unlock()
// Start 24hr UTC accrued grants resetter
wg := &sync.WaitGroup{}
go func() {
wg.Add(1)
defer wg.Done()
for {
now := r.aClock.Now()
currentUTCCheckpoint := getUTCMidnightOfDate(now)
nextUTCCheckpoint := time.Date(currentUTCCheckpoint.Year(), currentUTCCheckpoint.Month(), currentUTCCheckpoint.Day()+1, 0, 0, 0, 0, time.UTC)
untilNextUTCCheckpoint := nextUTCCheckpoint.Sub(r.aClock.Now())
select {
case <-ctx.Done():
return
case <-r.aClock.After(untilNextUTCCheckpoint):
// Compute how many grants have accrued and gone unused and remove that
// many from the bucket so that it starts empty for the next 24hr period
r.mu.Lock()
accruedGrants := r.maxRequestsPer24Hrs - r.grantedInLast24hrsUTC
if err := r.twentyFourHourLimiter.WaitN(ctx, accruedGrants); err != nil {
// Since we never set n to exceed the burst size, an error will only
// occur if the context is cancelled or it's deadline is exceeded. In
// these cases, we simply return so that this go-routine exits.
// From docs: "It returns an error if n exceeds the Limiter's burst
// size, the Context is canceled, or the expected wait time exceeds the
// Context's Deadline."
// Source: https://godoc.org/golang.org/x/time/rate#Limiter.WaitN
r.mu.Unlock()
return
}
r.currentUTCCheckpoint = nextUTCCheckpoint
r.grantedInLast24hrsUTC = 0
r.mu.Unlock()
}
}
}()
ticker := time.NewTicker(checkpointInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
wg.Wait()
return nil
case <-ticker.C:
// Store grants issued and current UTC checkpoint to DB
r.mu.Lock()
err := r.meshDB.UpdateMetadata(func(metadata meshdb.Metadata) meshdb.Metadata {
metadata.StartOfCurrentUTCDay = r.currentUTCCheckpoint
metadata.EthRPCRequestsSentInCurrentUTCDay = r.grantedInLast24hrsUTC
return metadata
})
r.mu.Unlock()
if err != nil {
if err == leveldb.ErrClosed {
// We can't continue if the database is closed. Stop the rateLimiter and
// return an error.
ticker.Stop()
wg.Wait()
return err
}
log.WithError(err).Error("rateLimiter.Start() error encountered while updating metadata in DB")
}
}
}
}
// Wait blocks until the rateLimiter allows for another request to be sent
func (r *rateLimiter) Wait(ctx context.Context) error {
if err := r.twentyFourHourLimiter.Wait(ctx); err != nil {
return err
}
if err := r.perSecondLimiter.Wait(ctx); err != nil {
return err
}
r.mu.Lock()
r.grantedInLast24hrsUTC++
r.mu.Unlock()
return nil
}
func (r *rateLimiter) getCurrentUTCCheckpoint() time.Time {
return r.currentUTCCheckpoint
}
func (r *rateLimiter) getGrantedInLast24hrsUTC() int {
return r.grantedInLast24hrsUTC
}
func getUTCMidnightOfDate(date time.Time) time.Time {
return time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC)
}