/
lazy-sync-rate-tracker.go
159 lines (140 loc) · 4.11 KB
/
lazy-sync-rate-tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package ratetracker
import (
"context"
"sync"
"sync/atomic"
"time"
"google.golang.org/protobuf/proto"
"github.com/fluxninja/aperture/pkg/config"
"github.com/fluxninja/aperture/pkg/jobs"
)
type counter struct {
local int32
global int64
}
// LazySyncRateTracker is a limiter that syncs its state lazily with another limiter.
type LazySyncRateTracker struct {
counters sync.Map
limiter RateTracker
jobGroup *jobs.JobGroup
name string
syncDuration time.Duration
totalCounters int64
}
// NewLazySyncRateTracker creates a new LazySyncLimiter.
func NewLazySyncRateTracker(limiter RateTracker,
syncDuration time.Duration,
jobGroup *jobs.JobGroup,
) (RateTracker, error) {
lsl := &LazySyncRateTracker{
limiter: limiter,
jobGroup: jobGroup,
name: limiter.Name() + "-lazy-sync",
syncDuration: syncDuration,
}
job := jobs.NewBasicJob(lsl.name, lsl.sync)
// register job with job group
err := lsl.jobGroup.RegisterJob(job, jobs.JobConfig{
ExecutionPeriod: config.MakeDuration(syncDuration),
ExecutionTimeout: config.MakeDuration(syncDuration),
})
if err != nil {
return nil, err
}
return lsl, nil
}
// Close closes the limiter.
func (lsl *LazySyncRateTracker) Close() error {
err := lsl.jobGroup.DeregisterJob(lsl.name)
if err != nil {
return err
}
lsl.limiter.Close()
return nil
}
func (lsl *LazySyncRateTracker) sync(ctx context.Context) (proto.Message, error) {
requestDelay := time.Duration(0)
totalCount := atomic.LoadInt64(&lsl.totalCounters)
if totalCount == 0 {
return nil, nil
}
// get deadline
deadline, ok := ctx.Deadline()
if ok {
// spread out requests over the deadline
duration := time.Until(deadline)
// reduce duration by 5ms to account for any processing overheads and the last few sync's
duration -= 5 * time.Millisecond
// if duration is less than 0, set it to 0
if duration < 0 {
duration = 0
}
requestDelay = time.Duration(int64(duration) / totalCount)
}
var i int64
// range through the map and sync the counters
lsl.counters.Range(func(label, value interface{}) bool {
c := value.(*counter)
// reset the local counter to 0 and asynchronously update the global counter if needed
local := atomic.SwapInt32(&c.local, 0)
// if local counter is 0, then remove the label from the map
if local == 0 {
// decrement total counters
atomic.AddInt64(&lsl.totalCounters, -1)
lsl.counters.Delete(label)
} else {
go func(i int64) {
dur := time.Duration(i * int64(requestDelay))
time.Sleep(dur)
_, _, global := lsl.limiter.TakeN(label.(string), int(local))
atomic.StoreInt64(&c.global, int64(global))
}(i)
i++
}
return true
})
return nil, nil
}
// Name returns the name of the limiter.
func (lsl *LazySyncRateTracker) Name() string {
return lsl.name
}
// TakeN takes n tokens from the limiter.
func (lsl *LazySyncRateTracker) TakeN(label string, n int) (bool, int, int) {
checkLimit := func(c *counter) (bool, int, int) {
// atomic increment local counter
local := atomic.AddInt32(&c.local, int32(n))
total := int(local) + int(atomic.LoadInt64(&c.global))
// check limit
ok, remaining := lsl.limiter.GetRateLimitChecker().CheckRateLimit(label, total)
return ok, remaining, total
}
// check if the counter exists in the map
if v, ok := lsl.counters.Load(label); ok {
c := v.(*counter)
return checkLimit(c)
}
// fallback to using the underlying limiter
ok, remaining, current := lsl.limiter.TakeN(label, n)
c := &counter{
local: int32(0), // we have already taken these tokens from the underlying limiter
global: int64(current),
}
existing, loaded := lsl.counters.LoadOrStore(label, c)
if loaded {
c := existing.(*counter)
return checkLimit(c)
} else {
// increment total counters
atomic.AddInt64(&lsl.totalCounters, 1)
}
return ok, remaining, current
}
// Take is a wrapper for TakeN(label, 1).
func (lsl *LazySyncRateTracker) Take(label string) (bool, int, int) {
return lsl.TakeN(label, 1)
}
// GetRateLimitChecker returns the limit checker of the limiter.
func (lsl *LazySyncRateTracker) GetRateLimitChecker() RateLimitChecker {
return lsl.limiter.GetRateLimitChecker()
}