/
ratelimit.go
106 lines (95 loc) · 2.29 KB
/
ratelimit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Package ratelimit provides the ratelimit implement by token bucket
package ratelimit
import (
"fmt"
"sync"
"time"
"github.com/pkg/errors"
"github.com/satori/go.uuid"
)
type tokenBucket struct {
TokenBucket chan uuid.UUID
quit chan struct{}
}
// InitTokenBucket create a new token bucket with given capacity
func InitTokenBucket(capacity int32) *tokenBucket {
ch := make(chan uuid.UUID, capacity)
q := make(chan struct{})
return &tokenBucket{
TokenBucket: ch,
quit: q,
}
}
// Preheat a token bucket before it start to fill token
func (t *tokenBucket) Preheat(reserved int32) error {
if reserved > int32(cap(t.TokenBucket)) {
return errors.Errorf("reserved:%d shall not bigger than tokenBucket capacity:%d", reserved, cap(t.TokenBucket))
}
if len(t.TokenBucket) != 0 {
return errors.Errorf("preheat shall only used for empty bucket!")
}
for i := int32(0); i < reserved; i++ {
uid, _ := uuid.NewV4()
select {
case t.TokenBucket <- uid:
default:
}
}
return nil
}
// Flush the token bucket during the given time duration
func (t *tokenBucket) Flush(flushDuration time.Duration) error {
ticker := time.NewTicker(flushDuration)
for {
select {
case <-t.TokenBucket:
continue
case <-ticker.C:
return nil
}
}
}
// Start the timely fill token to the token bucket, if param maxRuntime below or equal to zero, the fill token will run forever
func (t *tokenBucket) FillToken(fillInterval time.Duration, maxRuntime time.Duration) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
ticker := time.NewTicker(fillInterval)
for {
uid, _ := uuid.NewV4()
select {
case <-ticker.C:
t.TokenBucket <- uid
case <-t.quit:
return
}
fmt.Printf("bucket length: %d with %v\n", len(t.TokenBucket), time.Now())
}
}()
if maxRuntime > 0 {
ticker := time.NewTicker(maxRuntime)
<-ticker.C
return
}
wg.Wait()
}
// Consumer to fetch a token from the token bucket, return a UUID and fetch result
func (t *tokenBucket) FetchToken() (string, bool) {
var (
taken bool
token string
)
select {
case uid := <-t.TokenBucket:
fmt.Printf("fetch token:%s\n", uid)
token = uid.String()
taken = true
default:
}
return token, taken
}
// Stop flush the token bucket then stop fill token
func (t *tokenBucket) Stop() {
t.Flush(time.Second * 1)
close(t.quit)
}