/
lazy-sync.go
204 lines (172 loc) · 5.01 KB
/
lazy-sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package lazysync
import (
"context"
"sync"
"time"
"google.golang.org/protobuf/proto"
"github.com/fluxninja/aperture/v2/pkg/config"
ratelimiter "github.com/fluxninja/aperture/v2/pkg/dmap-funcs/rate-limiter"
"github.com/fluxninja/aperture/v2/pkg/jobs"
)
type counter struct {
nextSync time.Time
credit float64
current float64
available float64
lock sync.Mutex
waiting bool
}
// LazySyncRateLimiter is a limiter that syncs its state lazily with another limiter.
type LazySyncRateLimiter struct {
counters sync.Map
limiter ratelimiter.RateLimiter
jobGroup *jobs.JobGroup
name string
interval time.Duration
syncInterval time.Duration
}
// NewLazySyncRateLimiter creates a new LazySyncLimiter.
func NewLazySyncRateLimiter(
limiter ratelimiter.RateLimiter,
interval time.Duration,
numSync uint32,
jobGroup *jobs.JobGroup,
) (*LazySyncRateLimiter, error) {
lazySyncInterval := time.Duration(int64(interval) / int64(numSync))
lsl := &LazySyncRateLimiter{
limiter: limiter,
jobGroup: jobGroup,
name: limiter.Name() + "-lazy-sync",
interval: interval,
syncInterval: lazySyncInterval,
}
job := jobs.NewBasicJob(lsl.name, lsl.audit)
// register job with job group
err := lsl.jobGroup.RegisterJob(job, jobs.JobConfig{
ExecutionPeriod: config.MakeDuration(interval),
})
if err != nil {
return nil, err
}
return lsl, nil
}
// Close closes the limiter.
func (lsl *LazySyncRateLimiter) Close() error {
err := lsl.jobGroup.DeregisterJob(lsl.name)
if err != nil {
return err
}
lsl.limiter.Close()
return nil
}
// SetPassThrough sets whether the limiter should pass through requests when it is not initialized.
func (lsl *LazySyncRateLimiter) SetPassThrough(passThrough bool) {
lsl.limiter.SetPassThrough(passThrough)
}
// GetPassThrough returns whether the limiter should pass through requests when it is not initialized.
func (lsl *LazySyncRateLimiter) GetPassThrough() bool {
return lsl.limiter.GetPassThrough()
}
func (lsl *LazySyncRateLimiter) audit(ctx context.Context) (proto.Message, error) {
now := time.Now()
// range through the map and sync the counters
lsl.counters.Range(func(label, value interface{}) bool {
c := value.(*counter)
c.lock.Lock()
defer c.lock.Unlock()
// if this counter has not synced in a while, then remove it from the map
if now.After(c.nextSync.Add(lsl.interval)) {
lsl.counters.Delete(label)
return true
}
return true
})
return nil, nil
}
// Name returns the name of the limiter.
func (lsl *LazySyncRateLimiter) Name() string {
return lsl.name
}
func (lsl *LazySyncRateLimiter) takeN(ctx context.Context, label string, n float64, canWait bool) (bool, time.Duration, float64, float64) {
if lsl.GetPassThrough() {
return true, 0, 0, 0
}
now := time.Now()
syncRemote := func(c *counter, n float64) (bool, time.Duration, float64, float64) {
tokens := c.credit
if canWait {
tokens += n
}
ok, waitTime, remaining, current := lsl.limiter.Take(ctx, label, tokens)
c.credit = 0
if waitTime > 0 {
c.waiting = true
c.nextSync = now.Add(waitTime)
} else {
c.waiting = false
c.nextSync = now.Add(lsl.syncInterval)
}
if ok && !canWait {
if n <= remaining {
c.credit = n
remaining -= n
current += n
} else {
ok = false
}
}
c.available = remaining
c.current = current
return ok, waitTime, remaining, current
}
checkLimit := func(c *counter) (bool, time.Duration, float64, float64) {
c.lock.Lock()
defer c.lock.Unlock()
ret := func(ok bool, waitTime time.Duration) (bool, time.Duration, float64, float64) {
return ok, waitTime, c.available - c.credit, c.current + c.credit
}
if n <= 0 {
c.credit += n
return ret(true, 0)
}
if now.After(c.nextSync) {
return syncRemote(c, n)
}
waitTime := time.Duration(0)
if c.waiting && !canWait {
return ret(false, 0)
}
if c.waiting {
waitTime = c.nextSync.Sub(now)
}
if c.credit+n <= c.available {
c.credit += n
return ret(true, waitTime)
}
if canWait && !c.waiting {
return syncRemote(c, n)
}
return ret(false, waitTime)
}
c := &counter{}
existing, loaded := lsl.counters.LoadOrStore(label, c)
if loaded {
c = existing.(*counter)
}
return checkLimit(c)
}
// TakeIfAvailable takes n tokens from the limiter if they are available.
func (lsl *LazySyncRateLimiter) TakeIfAvailable(ctx context.Context, label string, n float64) (bool, time.Duration, float64, float64) {
return lsl.takeN(ctx, label, n, false)
}
// Take takes n tokens from the limiter.
func (lsl *LazySyncRateLimiter) Take(ctx context.Context, label string, n float64) (bool, time.Duration, float64, float64) {
return lsl.takeN(ctx, label, n, true)
}
// Return returns n tokens to the limiter.
func (lsl *LazySyncRateLimiter) Return(ctx context.Context, label string, n float64) (float64, float64) {
_, _, remaining, current := lsl.takeN(ctx, label, -n, false)
return remaining, current
}
// Make sure TokenBucketRateTracker implements Limiter interface.
var _ ratelimiter.RateLimiter = (*LazySyncRateLimiter)(nil)