forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bucket.go
170 lines (140 loc) · 4.38 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ratelimit
import (
"math"
"sync"
"time"
)
// Bucket models a token bucket
type Bucket struct {
unitsPerNano float64
nanosPerUnit float64
capacity int64
mutex sync.Mutex
available int64
lastRefill int64
// fractionalAvailable "buffers" any amounts that flowed into the bucket smaller than one unit
// This lets us retain precision even with pathological refill rates like (1E9 + 1) per second
fractionalAvailable float64
}
// NewBucketWithRate creates a new token bucket, with maximum capacity = initial capacity, and a refill rate of qps
// We use floats for refill calculations, which introduces the possibility of truncation and rounding errors.
// For "sensible" qps values though, is is acceptable: jbeda did some tests here https://play.golang.org/p/LSKUOGz2LG
func NewBucketWithRate(qps float64, capacity int64) *Bucket {
unitsPerNano := qps / 1E9
nanosPerUnit := 1E9 / qps
b := &Bucket{
unitsPerNano: unitsPerNano,
nanosPerUnit: nanosPerUnit,
capacity: capacity,
available: capacity,
lastRefill: time.Now().UnixNano(),
}
return b
}
// Take takes n units from the bucket, reducing the available quantity even below zero,
// but then returns the amount of time we should wait
func (b *Bucket) Take(n int64) time.Duration {
b.mutex.Lock()
defer b.mutex.Unlock()
var d time.Duration
if b.available >= n {
// Fast path when bucket has sufficient availability before refilling
} else {
b.refill()
if b.available < n {
deficit := n - b.available
d = time.Duration(int64(float64(deficit) * b.nanosPerUnit))
}
}
b.available -= n
return d
}
// TakeAvailable immediately takes whatever quantity is available, up to max
func (b *Bucket) TakeAvailable(max int64) int64 {
b.mutex.Lock()
defer b.mutex.Unlock()
var took int64
if b.available >= max {
// Fast path when bucket has sufficient availability before refilling
took = max
} else {
b.refill()
took = b.available
if took < 0 {
took = 0
} else if took > max {
took = max
}
}
if took > 0 {
b.available -= took
}
return took
}
// Wait combines a call to Take with a sleep call
func (b *Bucket) Wait(n int64) {
d := b.Take(n)
if d != 0 {
time.Sleep(d)
}
}
// Capacity returns the maximum capacity of the bucket
func (b *Bucket) Capacity() int64 {
return b.capacity
}
// Available returns the quantity available in the bucket (which may be negative), but does not take it.
// This function is for diagnostic / informational purposes only - the returned capacity may immediately
// be inaccurate if another thread is operating on the bucket concurrently.
func (b *Bucket) Available() int64 {
b.mutex.Lock()
defer b.mutex.Unlock()
b.refill()
return b.available
}
// refill replenishes the bucket based on elapsed time; mutex must be held
func (b *Bucket) refill() {
// Note that we really want a monotonic clock here, but go says no:
// https://github.com/golang/go/issues/12914
now := time.Now().UnixNano()
b.refillAtTimestamp(now)
}
// refillAtTimestamp is the logic of the refill function, for testing
func (b *Bucket) refillAtTimestamp(now int64) {
nanosSinceLastRefill := now - b.lastRefill
if nanosSinceLastRefill <= 0 {
// we really want monotonic
return
}
// Compute units that have flowed into bucket
refillFloat := (float64(nanosSinceLastRefill) * b.unitsPerNano) + b.fractionalAvailable
if refillFloat > float64(b.capacity) {
// float64 > MaxInt64 can be converted to negative int64; side step this
b.available = b.capacity
// Don't worry about the fractional units with huge refill rates
} else {
whole, fraction := math.Modf(refillFloat)
refill := int64(whole)
b.fractionalAvailable = fraction
if refill != 0 {
// Refill with overflow
b.available += refill
if b.available >= b.capacity {
b.available = b.capacity
b.fractionalAvailable = 0
}
}
}
b.lastRefill = now
}