-
Notifications
You must be signed in to change notification settings - Fork 0
/
rate.go
81 lines (72 loc) · 1.4 KB
/
rate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package rate
import (
"sync/atomic"
"time"
)
type Rate struct {
bucketSize int64
bucketSurplusSize int64
bucketAddSize int64
stopChan chan bool
NowRate int64
}
func NewRate(addSize int64) *Rate {
return &Rate{
bucketSize: addSize * 2,
bucketSurplusSize: 0,
bucketAddSize: addSize,
stopChan: make(chan bool),
}
}
func (s *Rate) Start() {
go s.session()
}
func (s *Rate) add(size int64) {
if res := s.bucketSize - s.bucketSurplusSize; res < s.bucketAddSize {
atomic.AddInt64(&s.bucketSurplusSize, res)
return
}
atomic.AddInt64(&s.bucketSurplusSize, size)
}
//回桶
func (s *Rate) ReturnBucket(size int64) {
s.add(size)
}
//停止
func (s *Rate) Stop() {
s.stopChan <- true
}
func (s *Rate) Get(size int64) {
if s.bucketSurplusSize >= size {
atomic.AddInt64(&s.bucketSurplusSize, -size)
return
}
ticker := time.NewTicker(time.Millisecond * 100)
for {
select {
case <-ticker.C:
if s.bucketSurplusSize >= size {
atomic.AddInt64(&s.bucketSurplusSize, -size)
ticker.Stop()
return
}
}
}
}
func (s *Rate) session() {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ticker.C:
if rs := s.bucketAddSize - s.bucketSurplusSize; rs > 0 {
s.NowRate = rs
} else {
s.NowRate = s.bucketSize - s.bucketSurplusSize
}
s.add(s.bucketAddSize)
case <-s.stopChan:
ticker.Stop()
return
}
}
}