forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.go
363 lines (293 loc) · 6.88 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
package scheduler
import (
"errors"
"runtime/debug"
"sync"
"time"
"github.com/elastic/beats/libbeat/logp"
)
type Scheduler struct {
limit uint
running bool
location *time.Location
jobs []*job
active uint // number of active entries
add, rm chan *job
finished chan taskOverSignal
// list of active tasks waiting to be executed
tasks []task
done chan struct{}
wg sync.WaitGroup
}
type Canceller func() error
// A job is a re-schedulable entry point in a set of tasks. Each task can return
// a new set of tasks being executed (subject to active task limits). Only after
// all tasks of a job have been finished, the job is marked as done and subject
// to be re-scheduled.
type job struct {
name string
next time.Time
schedule Schedule
fn TaskFunc
registered bool
running uint32 // count number of active task for job
}
// A single task in an active job.
type task struct {
job *job
fn TaskFunc
}
// Single task in an active job. Optionally returns continuation of tasks to
// be executed within current job.
type TaskFunc func() []TaskFunc
type taskOverSignal struct {
entry *job
cont []task // continuation tasks to be executed by concurrently for job at hand
}
type Schedule interface {
Next(time.Time) time.Time
}
var debugf = logp.MakeDebug("scheduler")
func New(limit uint) *Scheduler {
return NewWithLocation(limit, time.Local)
}
func NewWithLocation(limit uint, location *time.Location) *Scheduler {
return &Scheduler{
limit: limit,
location: location,
running: false,
jobs: nil,
active: 0,
add: make(chan *job),
rm: make(chan *job),
finished: make(chan taskOverSignal),
done: make(chan struct{}),
wg: sync.WaitGroup{},
}
}
func (s *Scheduler) Start() error {
if s.running {
return errors.New("scheduler already running")
}
s.running = true
go s.run()
return nil
}
func (s *Scheduler) Stop() error {
if !s.running {
return errors.New("scheduler already stopped")
}
s.running = false
close(s.done)
s.wg.Wait()
return nil
}
func (s *Scheduler) Add(sched Schedule, name string, entrypoint TaskFunc) func() error {
debugf("Add scheduler job '%v'.", name)
j := &job{
name: name,
fn: entrypoint,
schedule: sched,
registered: false,
running: 0,
}
if !s.running {
s.doAdd(j)
} else {
s.add <- j
}
return func() error { return s.remove(j) }
}
func (s *Scheduler) remove(j *job) error {
debugf("Remove scheduler job '%v'", j.name)
if !s.running {
s.doRemove(j)
} else {
s.rm <- j
}
return nil
}
func (s *Scheduler) run() {
defer func() {
// drain finished queue for active jobs to not leak
// go-routines on exit
for i := uint(0); i < s.active; i++ {
<-s.finished
}
}()
debugf("Start scheduler.")
defer debugf("Scheduler stopped.")
now := time.Now().In(s.location)
for _, j := range s.jobs {
j.next = j.schedule.Next(now)
}
resched := true
var timer *time.Timer
for {
if resched {
sortEntries(s.jobs)
}
resched = true
if (s.limit == 0 || s.active < s.limit) && len(s.jobs) > 0 {
next := s.jobs[0].next
debugf("Next wakeup time: %v", next)
if timer != nil {
timer.Stop()
}
timer = time.NewTimer(next.Sub(time.Now().In(s.location)))
}
var timeSignal <-chan time.Time
if timer != nil {
timeSignal = timer.C
}
select {
case now = <-timeSignal:
for _, j := range s.jobs {
if now.Before(j.next) {
break
}
if j.running > 0 {
debugf("Scheduled job '%v' still active.", j.name)
reschedActive(j, now)
continue
}
if s.limit > 0 && s.active == s.limit {
logp.Info("Scheduled job '%v' waiting.", j.name)
timer = nil
continue
}
s.startJob(j)
}
case sig := <-s.finished:
s.active--
j := sig.entry
debugf("Job '%v' returned at %v (cont=%v).", j.name, time.Now(), len(sig.cont))
// add number of job continuation tasks returned to current job task
// counter and remove count for task just being finished
j.running += uint32(len(sig.cont)) - 1
count := 0 // number of rescheduled waiting jobs
// try to start waiting jobs
for _, waiting := range s.jobs {
if now.Before(waiting.next) {
break
}
if waiting.running > 0 {
count++
reschedActive(waiting, now)
continue
}
debugf("Start waiting job: %v", waiting.name)
s.startJob(waiting)
break
}
// Try to start waiting tasks of already running jobs.
// The s.tasks waiting list will only have any entries if `s.limit > 0`.
if s.limit > 0 && (s.active < s.limit) {
if T := uint(len(s.tasks)); T > 0 {
N := s.limit - s.active
debugf("start up to %v waiting tasks (%v)", N, T)
if N > T {
N = T
}
tasks := s.tasks[:N]
s.tasks = s.tasks[N:]
for _, t := range tasks {
s.runTask(t)
}
}
}
// try to start returned tasks for current job and put left-over tasks into
// waiting list.
if N := len(sig.cont); N > 0 {
if s.limit > 0 {
limit := int(s.limit - s.active)
if N > limit {
N = limit
}
}
if N > 0 {
debugf("start returned tasks")
tasks := sig.cont[:N]
sig.cont = sig.cont[N:]
for _, t := range tasks {
s.runTask(t)
}
}
}
if len(sig.cont) > 0 {
s.tasks = append(s.tasks, sig.cont...)
}
// reschedule (sort) list of tasks, if any task to be run next is
// still active.
resched = count > 0
case j := <-s.add:
j.next = j.schedule.Next(time.Now().In(s.location))
s.doAdd(j)
case j := <-s.rm:
s.doRemove(j)
case <-s.done:
debugf("done")
return
}
}
}
func reschedActive(j *job, now time.Time) {
logp.Info("Scheduled job '%v' already active.", j.name)
if !now.Before(j.next) {
j.next = j.schedule.Next(j.next)
}
}
func (s *Scheduler) startJob(j *job) {
j.running++
j.next = j.schedule.Next(j.next)
debugf("Start job '%v' at %v.", j.name, time.Now())
s.runTask(task{j, j.fn})
}
func (s *Scheduler) runTask(t task) {
j := t.job
s.active++
go func() {
defer func() {
if r := recover(); r != nil {
logp.Err("Panic in job '%v'. Recovering, but please report this: %s.",
j.name, r)
logp.Err("Stacktrace: %s", debug.Stack())
s.signalFinished(j, nil)
}
}()
cont := t.fn()
s.signalFinished(j, cont)
}()
}
func (s *Scheduler) doAdd(j *job) {
j.registered = true
s.jobs = append(s.jobs, j)
}
func (s *Scheduler) doRemove(j *job) {
// find entry
idx := -1
for i, other := range s.jobs {
if j == other {
idx = i
break
}
}
if idx == -1 {
return
}
// delete entry, not preserving order
s.jobs[idx] = s.jobs[len(s.jobs)-1]
s.jobs = s.jobs[:len(s.jobs)-1]
// mark entry as unregistered
j.registered = false
}
func (s *Scheduler) signalFinished(j *job, cont []TaskFunc) {
var tasks []task
if len(cont) > 0 {
tasks = make([]task, len(cont))
for i, f := range cont {
tasks[i] = task{j, f}
}
}
s.finished <- taskOverSignal{j, tasks}
}