/
qps_limiter.go
120 lines (107 loc) · 2.9 KB
/
qps_limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/*
* Copyright 2021 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package limiter
import (
"sync/atomic"
"time"
)
var (
fixedWindowTime = time.Second
)
// qpsLimiter implements the RateLimiter interface.
type qpsLimiter struct {
limit int32
tokens int32
interval time.Duration
once int32
ticker *time.Ticker
}
// NewQPSLimiter creates qpsLimiter.
func NewQPSLimiter(interval time.Duration, limit int) RateLimiter {
l := &qpsLimiter{
limit: int32(limit),
tokens: int32(limit),
interval: interval,
once: calcOnce(interval, limit),
ticker: time.NewTicker(interval),
}
go l.startTicker()
return l
}
// UpdateLimit update limitation of QPS. It is **not** concurrent-safe.
func (l *qpsLimiter) UpdateLimit(limit int) {
once := calcOnce(l.interval, limit)
atomic.StoreInt32(&l.limit, int32(limit))
atomic.StoreInt32(&l.once, once)
}
// UpdateQPSLimit update the interval and limit. It is **not** concurrent-safe.
func (l *qpsLimiter) UpdateQPSLimit(interval time.Duration, limit int) {
atomic.StoreInt32(&l.limit, int32(limit))
once := calcOnce(interval, limit)
atomic.StoreInt32(&l.once, once)
if interval != l.interval {
l.interval = interval
l.stopTicker()
l.ticker = time.NewTicker(interval)
go l.startTicker()
}
}
// Acquire adds 1.
func (l *qpsLimiter) Acquire() bool {
if atomic.LoadInt32(&l.tokens) <= 0 {
return false
}
return atomic.AddInt32(&l.tokens, -1) >= 0
}
// Status returns the current status.
func (l *qpsLimiter) Status() (max, cur int, interval time.Duration) {
max = int(atomic.LoadInt32(&l.limit))
cur = int(atomic.LoadInt32(&l.tokens))
interval = l.interval
return
}
func (l *qpsLimiter) startTicker() {
ch := l.ticker.C
for range ch {
l.updateToken()
}
}
func (l *qpsLimiter) stopTicker() {
l.ticker.Stop()
}
// Some deviation is allowed here to gain better performance.
func (l *qpsLimiter) updateToken() {
var v int32
v = atomic.LoadInt32(&l.tokens)
if v < 0 {
v = atomic.LoadInt32(&l.once)
} else if v+atomic.LoadInt32(&l.once) > atomic.LoadInt32(&l.limit) {
v = atomic.LoadInt32(&l.limit)
} else {
v = v + atomic.LoadInt32(&l.once)
}
atomic.StoreInt32(&l.tokens, v)
}
func calcOnce(interval time.Duration, limit int) int32 {
if interval > time.Second {
interval = time.Second
}
once := int32(float64(limit) / (fixedWindowTime.Seconds() / interval.Seconds()))
if once < 0 {
once = 0
}
return once
}