forked from goharbor/harbor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.go
311 lines (258 loc) · 7.06 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
package scheduler
import (
"github.com/vmware/harbor/src/common/scheduler/policy"
"github.com/vmware/harbor/src/common/utils/log"
"errors"
"fmt"
"reflect"
"strings"
"sync"
"time"
)
const (
defaultQueueSize = 10
statSchedulePolicy = "Schedule Policy"
statUnSchedulePolicy = "Unschedule Policy"
statTaskRun = "Task Run"
statTaskComplete = "Task Complete"
statTaskFail = "Task Fail"
)
//StatItem is defined for the stat metrics.
type StatItem struct {
//Metrics catalog
Type string
//The stat value
Value uint32
//Attach some other info
Attachment interface{}
}
//StatSummary is used to collect some metrics of scheduler.
type StatSummary struct {
//Count of scheduled policy
PolicyCount uint32
//Total count of tasks
Tasks uint32
//Count of successfully complete tasks
CompletedTasks uint32
//Count of tasks with errors
TasksWithError uint32
}
//Configuration defines configuration of Scheduler.
type Configuration struct {
QueueSize uint8
}
//Scheduler is designed for scheduling policies.
type Scheduler struct {
//Mutex for sync controling.
*sync.RWMutex
//Related configuration options for scheduler.
config *Configuration
//Store to keep the references of scheduled policies.
policies Store
//Queue for receiving policy scheduling request
scheduleQueue chan *Watcher
//Queue for receiving policy unscheduling request or complete signal.
unscheduleQueue chan *Watcher
//Channel for receiving stat metrics.
statChan chan *StatItem
//Channel for terminate scheduler damon.
terminateChan chan bool
//The stat metrics of scheduler.
stats *StatSummary
//To indicate whether scheduler is running or not
isRunning bool
}
//DefaultScheduler is a default scheduler.
var DefaultScheduler = NewScheduler(nil)
//NewScheduler is constructor for creating a scheduler.
func NewScheduler(config *Configuration) *Scheduler {
var qSize uint8 = defaultQueueSize
if config != nil && config.QueueSize > 0 {
qSize = config.QueueSize
}
sq := make(chan *Watcher, qSize)
usq := make(chan *Watcher, qSize)
stChan := make(chan *StatItem, 4)
tc := make(chan bool, 1)
store := NewDefaultStore()
return &Scheduler{
RWMutex: new(sync.RWMutex),
config: config,
policies: store,
scheduleQueue: sq,
unscheduleQueue: usq,
statChan: stChan,
terminateChan: tc,
stats: &StatSummary{
PolicyCount: 0,
Tasks: 0,
CompletedTasks: 0,
TasksWithError: 0,
},
isRunning: false,
}
}
//Start the scheduler damon.
func (sch *Scheduler) Start() {
sch.Lock()
defer sch.Unlock()
//If scheduler is already running
if sch.isRunning {
return
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Runtime error in scheduler:%s\n", r)
}
}()
defer func() {
//Clear resources
sch.policies.Clear()
log.Infof("Policy scheduler stop at %s\n", time.Now().UTC().Format(time.RFC3339))
}()
for {
select {
case <-sch.terminateChan:
//Exit
return
case wt := <-sch.scheduleQueue:
//If status is stopped, no requests should be served
if !sch.IsRunning() {
continue
}
go func(watcher *Watcher) {
if watcher != nil && watcher.p != nil {
//Enable it.
watcher.Start()
//Update stats and log info.
log.Infof("Policy %s is scheduled", watcher.p.Name())
sch.statChan <- &StatItem{statSchedulePolicy, 1, nil}
}
}(wt)
case wt := <-sch.unscheduleQueue:
//If status is stopped, no requests should be served
if !sch.IsRunning() {
continue
}
go func(watcher *Watcher) {
if watcher != nil && watcher.IsRunning() {
watcher.Stop()
//Update stats and log info.
log.Infof("Policy %s is unscheduled", watcher.p.Name())
sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil}
}
}(wt)
case stat := <-sch.statChan:
{
//If status is stopped, no requests should be served
if !sch.IsRunning() {
continue
}
switch stat.Type {
case statSchedulePolicy:
sch.stats.PolicyCount += stat.Value
break
case statUnSchedulePolicy:
sch.stats.PolicyCount -= stat.Value
break
case statTaskRun:
sch.stats.Tasks += stat.Value
break
case statTaskComplete:
sch.stats.CompletedTasks += stat.Value
break
case statTaskFail:
sch.stats.TasksWithError += stat.Value
break
default:
break
}
log.Infof("Policies:%d, Tasks:%d, CompletedTasks:%d, FailedTasks:%d\n",
sch.stats.PolicyCount,
sch.stats.Tasks,
sch.stats.CompletedTasks,
sch.stats.TasksWithError)
if stat.Attachment != nil &&
reflect.TypeOf(stat.Attachment).String() == "*errors.errorString" {
log.Errorf("%s: %s\n", stat.Type, stat.Attachment.(error).Error())
}
}
}
}
}()
sch.isRunning = true
log.Infof("Policy scheduler start at %s\n", time.Now().UTC().Format(time.RFC3339))
}
//Stop the scheduler damon.
func (sch *Scheduler) Stop() {
//Lock for state changing
sch.Lock()
//Check if the scheduler is running
if !sch.isRunning {
sch.Unlock()
return
}
sch.isRunning = false
sch.Unlock()
//Terminate damon to stop receiving signals.
sch.terminateChan <- true
}
//Schedule and enable the policy.
func (sch *Scheduler) Schedule(scheduledPolicy policy.Policy) error {
if scheduledPolicy == nil {
return errors.New("nil is not Policy object")
}
if strings.TrimSpace(scheduledPolicy.Name()) == "" {
return errors.New("Policy should be assigned a name")
}
tasks := scheduledPolicy.Tasks()
if tasks == nil || len(tasks) == 0 {
return errors.New("Policy must attach task(s)")
}
//Try to schedule the policy.
//Keep the policy for future use after it's successfully scheduled.
watcher := NewWatcher(scheduledPolicy, sch.statChan, sch.unscheduleQueue)
if err := sch.policies.Put(scheduledPolicy.Name(), watcher); err != nil {
return err
}
//Schedule the policy
sch.scheduleQueue <- watcher
return nil
}
//UnSchedule the specified policy from the enabled policies list.
func (sch *Scheduler) UnSchedule(policyName string) error {
if strings.TrimSpace(policyName) == "" {
return errors.New("Empty policy name is invalid")
}
//Find the watcher.
watcher := sch.policies.Remove(policyName)
if watcher == nil {
return fmt.Errorf("Policy %s is not existing", policyName)
}
//Unschedule the policy.
sch.unscheduleQueue <- watcher
return nil
}
//IsRunning to indicate whether the scheduler is running.
func (sch *Scheduler) IsRunning() bool {
sch.RLock()
defer sch.RUnlock()
return sch.isRunning
}
//HasScheduled is to check whether the given policy has been scheduled or not.
func (sch *Scheduler) HasScheduled(policyName string) bool {
return sch.policies.Exists(policyName)
}
//GetPolicy is used to get related policy reference by its name.
func (sch *Scheduler) GetPolicy(policyName string) policy.Policy {
wk := sch.policies.Get(policyName)
if wk != nil {
return wk.p
}
return nil
}
//PolicyCount returns the count of currently scheduled policies in the scheduler.
func (sch *Scheduler) PolicyCount() uint32 {
return sch.policies.Size()
}