forked from envoyproxy/ratelimit
/
ratelimit.go
209 lines (175 loc) · 5.51 KB
/
ratelimit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package ratelimit
import (
"strings"
"sync"
"github.com/lyft/goruntime/loader"
"github.com/lyft/gostats"
pb "github.com/lyft/ratelimit/proto/envoy/service/ratelimit/v2"
"github.com/lyft/ratelimit/src/assert"
"github.com/lyft/ratelimit/src/config"
"github.com/lyft/ratelimit/src/redis"
logger "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
type shouldRateLimitStats struct {
redisError stats.Counter
serviceError stats.Counter
}
func newShouldRateLimitStats(scope stats.Scope) shouldRateLimitStats {
ret := shouldRateLimitStats{}
ret.redisError = scope.NewCounter("redis_error")
ret.serviceError = scope.NewCounter("service_error")
return ret
}
type serviceStats struct {
configLoadSuccess stats.Counter
configLoadError stats.Counter
shouldRateLimit shouldRateLimitStats
}
func newServiceStats(scope stats.Scope) serviceStats {
ret := serviceStats{}
ret.configLoadSuccess = scope.NewCounter("config_load_success")
ret.configLoadError = scope.NewCounter("config_load_error")
ret.shouldRateLimit = newShouldRateLimitStats(scope.Scope("call.should_rate_limit"))
return ret
}
type RateLimitServiceServer interface {
pb.RateLimitServiceServer
GetCurrentConfig() config.RateLimitConfig
GetLegacyService() RateLimitLegacyServiceServer
}
type service struct {
runtime loader.IFace
configLock sync.RWMutex
configLoader config.RateLimitConfigLoader
config config.RateLimitConfig
runtimeUpdateEvent chan int
cache redis.RateLimitCache
stats serviceStats
rlStatsScope stats.Scope
legacy *legacyService
}
func (this *service) reloadConfig() {
defer func() {
if e := recover(); e != nil {
configError, ok := e.(config.RateLimitConfigError)
if !ok {
panic(e)
}
this.stats.configLoadError.Inc()
logger.Errorf("error loading new configuration from runtime: %s", configError.Error())
}
}()
files := []config.RateLimitConfigToLoad{}
snapshot := this.runtime.Snapshot()
for _, key := range snapshot.Keys() {
if !strings.HasPrefix(key, "config.") {
continue
}
files = append(files, config.RateLimitConfigToLoad{key, snapshot.Get(key)})
}
newConfig := this.configLoader.Load(files, this.rlStatsScope)
this.stats.configLoadSuccess.Inc()
this.configLock.Lock()
this.config = newConfig
this.configLock.Unlock()
}
type serviceError string
func (e serviceError) Error() string {
return string(e)
}
func checkServiceErr(something bool, msg string) {
if !something {
panic(serviceError(msg))
}
}
func (this *service) shouldRateLimitWorker(
ctx context.Context, request *pb.RateLimitRequest) *pb.RateLimitResponse {
checkServiceErr(request.Domain != "", "rate limit domain must not be empty")
checkServiceErr(len(request.Descriptors) != 0, "rate limit descriptor list must not be empty")
snappedConfig := this.GetCurrentConfig()
checkServiceErr(snappedConfig != nil, "no rate limit configuration loaded")
limitsToCheck := make([]*config.RateLimit, len(request.Descriptors))
for i, descriptor := range request.Descriptors {
limitsToCheck[i] = snappedConfig.GetLimit(ctx, request.Domain, descriptor)
}
responseDescriptorStatuses := this.cache.DoLimit(ctx, request, limitsToCheck)
assert.Assert(len(limitsToCheck) == len(responseDescriptorStatuses))
response := &pb.RateLimitResponse{}
response.Statuses = make([]*pb.RateLimitResponse_DescriptorStatus, len(request.Descriptors))
finalCode := pb.RateLimitResponse_OK
for i, descriptorStatus := range responseDescriptorStatuses {
response.Statuses[i] = descriptorStatus
if descriptorStatus.Code == pb.RateLimitResponse_OVER_LIMIT {
finalCode = descriptorStatus.Code
}
}
response.OverallCode = finalCode
return response
}
func (this *service) ShouldRateLimit(
ctx context.Context,
request *pb.RateLimitRequest) (finalResponse *pb.RateLimitResponse, finalError error) {
defer func() {
err := recover()
if err == nil {
return
}
logger.Debugf("caught error during call")
finalResponse = nil
switch t := err.(type) {
case redis.RedisError:
{
this.stats.shouldRateLimit.redisError.Inc()
finalError = t
}
case serviceError:
{
this.stats.shouldRateLimit.serviceError.Inc()
finalError = t
}
default:
panic(err)
}
}()
response := this.shouldRateLimitWorker(ctx, request)
logger.Debugf("returning normal response")
return response, nil
}
func (this *service) GetLegacyService() RateLimitLegacyServiceServer {
return this.legacy
}
func (this *service) GetCurrentConfig() config.RateLimitConfig {
this.configLock.RLock()
defer this.configLock.RUnlock()
return this.config
}
func NewService(runtime loader.IFace, cache redis.RateLimitCache,
configLoader config.RateLimitConfigLoader, stats stats.Scope) RateLimitServiceServer {
newService := &service{
runtime: runtime,
configLock: sync.RWMutex{},
configLoader: configLoader,
config: nil,
runtimeUpdateEvent: make(chan int),
cache: cache,
stats: newServiceStats(stats),
rlStatsScope: stats.Scope("rate_limit"),
}
newService.legacy = &legacyService{
s: newService,
shouldRateLimitLegacyStats: newShouldRateLimitLegacyStats(stats),
}
runtime.AddUpdateCallback(newService.runtimeUpdateEvent)
newService.reloadConfig()
go func() {
// No exit right now.
for {
logger.Debugf("waiting for runtime update")
<-newService.runtimeUpdateEvent
logger.Debugf("got runtime update and reloading config")
newService.reloadConfig()
}
}()
return newService
}