-
Notifications
You must be signed in to change notification settings - Fork 374
/
throttler.go
149 lines (126 loc) · 3.94 KB
/
throttler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright (c) 2021 Terminus, Inc.
//
// This program is free software: you can use, redistribute, and/or modify
// it under the terms of the GNU Affero General Public License, version 3
// or later ("AGPL"), as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package throttler
import (
"sync"
"github.com/erda-project/erda/modules/pipeline/pipengine/queue/enhancedqueue"
"github.com/erda-project/erda/modules/pipeline/pipengine/queue/snapshot"
)
type Throttler interface {
// Name 节流阀的名字
Name() string
// AddQueue 幂等创建队列
AddQueue(name string, window int64)
// AddKeyToQueues 幂等将 key 同时插入指定队列;若队列不存在,则会首先幂等创建队列;若不同时插入,则可能直接被调度了
AddKeyToQueues(key string, reqs []AddKeyToQueueRequest)
PopPending(key string) (bool, []PopDetail)
PopProcessing(key string) (bool, []PopDetail)
snapshot.Snapshot
}
type throttler struct {
name string
queueByName map[string]*enhancedqueue.EnhancedQueue // 该节流阀关心的所有增强队列
keyRelatedQueues map[string]map[string]*enhancedqueue.EnhancedQueue // 所有 key 和队列的关联关系
lock sync.Mutex
}
func NewNamedThrottler(name string, initQueues map[string]int64) Throttler {
t := throttler{
name: name,
queueByName: make(map[string]*enhancedqueue.EnhancedQueue),
keyRelatedQueues: make(map[string]map[string]*enhancedqueue.EnhancedQueue),
lock: sync.Mutex{},
}
for name, window := range initQueues {
t.AddQueue(name, window)
}
return &t
}
func (t *throttler) Name() string {
return t.name
}
func (t *throttler) AddQueue(name string, window int64) {
t.lock.Lock()
defer t.lock.Unlock()
t.addQueue(name, window)
}
func (t *throttler) AddKeyToQueues(key string, reqs []AddKeyToQueueRequest) {
t.lock.Lock()
defer t.lock.Unlock()
for _, req := range reqs {
t.addKeyToQueue(key, req)
}
}
// PopPending 将指定 key 从所有关联队列 pending 弹出到 processing
// 若 key 不存在,返回可弹出
func (t *throttler) PopPending(key string) (bool, []PopDetail) {
t.lock.Lock()
defer t.lock.Unlock()
var popDetails []PopDetail
relatedQueues, ok := t.keyRelatedQueues[key]
if !ok {
// key 没有关联的增强队列,返回成功
popDetails = append(popDetails, PopDetail{
CanPop: true,
Reason: "no related queues",
})
return true, popDetails
}
canPop := true
for qName, eq := range relatedQueues {
poppedKey := eq.PopPending(true)
if poppedKey != key {
popDetails = append(popDetails, newPopDetail(qName, false, "cannot pop pending now, waiting for next time"))
canPop = false
continue
}
popDetails = append(popDetails, newPopDetail(qName, true, ""))
}
// 可弹出
if canPop {
// 遍历弹出
for _, eq := range relatedQueues {
eq.PopPending()
}
}
return canPop, popDetails
}
// PopProcessing
func (t *throttler) PopProcessing(key string) (bool, []PopDetail) {
t.lock.Lock()
defer t.lock.Unlock()
relatedQueues, ok := t.keyRelatedQueues[key]
if !ok {
return true, nil
}
var popDetails []PopDetail
canPop := true
for qName, eq := range relatedQueues {
poppedKey := eq.PopProcessing(key, true)
if poppedKey != key {
popDetails = append(popDetails, newPopDetail(qName, false, "cannot pop processing now, waiting for next time"))
canPop = false
continue
}
popDetails = append(popDetails, newPopDetail(qName, true, ""))
}
// 可弹出
if canPop {
// 遍历弹出
for _, eq := range relatedQueues {
eq.PopProcessing(key)
}
// cleanup
delete(t.keyRelatedQueues, key)
}
return canPop, popDetails
}