/
speed.go
94 lines (79 loc) · 1.77 KB
/
speed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package utils
import (
"time"
"math"
LOG "github.com/vinllen/log4go"
)
type Qos struct {
Limit int64 // qps, <= 0 means disable limit
Ticket int64 // one tick size, default is 1
bucket chan struct{} // bucket channel
addr *int64 // periodically check whether the address value is equal to Limit, and update if not.
close bool // channel is closed?
prevLimit int64 // previous address limit
}
func StartQoS(limit, ticket int64, addr *int64) *Qos {
if ticket <= 0 {
// illegal
return nil
}
q := new(Qos)
q.Limit = limit
q.Ticket = ticket
q.addr = addr
q.bucket = make(chan struct{}, limit)
go q.timer()
return q
}
func (q *Qos) FetchBucket() {
for q.Limit > 0 { // the old bucket channel maybe release, so we need to retry once timeout
select {
case <-q.bucket:
return
case <-time.After(time.Second * 1):
break
}
}
}
func (q *Qos) resizeLimit() {
// we must empty previous channel first to avoid memory leak
FOR:
for {
select {
case <-q.bucket:
default:
// break if bucket if empty
break FOR
}
}
LOG.Info("clear old channel, set new bucket size[%v]", q.Limit)
q.bucket = make(chan struct{}, q.Limit)
}
func (q *Qos) timer() {
var i int64
for range time.NewTicker(1 * time.Second).C {
if q.close {
return
}
if *q.addr != q.prevLimit {
LOG.Info("try to resize bucket channel from %v to %v, bucket size[%v], ticket[%v]",
q.prevLimit, *q.addr, q.Limit, q.Ticket)
q.prevLimit = *q.addr
// 0 is ok
q.Limit = int64(math.Ceil(float64(*q.addr) / float64(q.Ticket)))
q.resizeLimit()
}
INJECT:
for i = 0; i < q.Limit; i++ {
select {
case q.bucket <- struct{}{}:
default:
// break if bucket if full
break INJECT
}
}
}
}
func (q *Qos) Close() {
q.close = true
}