forked from libopenstorage/openstorage
/
sched.go
227 lines (199 loc) · 4.62 KB
/
sched.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package sched
import (
"container/list"
"fmt"
"sync"
"time"
"github.com/libopenstorage/openstorage/pkg/dbg"
)
type TaskID uint64
const (
TaskNone = TaskID(0)
numGoRoutines = 10
)
func ValidTaskID(t TaskID) bool { return t != TaskNone }
type ScheduleTask func(Interval)
type Scheduler interface {
// Schedule given task at given interval.
// Returns associated task id if scheduled successfully,
// or a non-nil error in case of error.
Schedule(task ScheduleTask, interval Interval,
runAt time.Time, onlyOnce bool) (TaskID, error)
// Cancel given task.
Cancel(taskID TaskID) error
// Restart scheduling.
Start()
// Stop scheduling.
Stop()
}
var instance Scheduler
type taskInfo struct {
// ID unique task identifier
ID TaskID
// task function to run
task ScheduleTask
// interval at which task is scheduled
interval Interval
// runtAt is next time at which task is going to be scheduled
runAt time.Time
// onlyOnce one time execution only
onlyOnce bool
// valid is true until task is not cancelled
valid bool
// lock for the enqueued member
lock sync.Mutex
// enqueued is true if task is scheduled to run
enqueued bool
}
type manager struct {
sync.Mutex
// minimumInterval defines minumum task scheduling interval
minimumInterval time.Duration
// tasks is list of scheduled tasks
tasks *list.List
// currTaskID grows monotonically and gives next taskID
currTaskID TaskID
// ticker ticks every minimumInterval
ticker *time.Ticker
// started is true if schedular is not stopped
started bool
// enqueuedTasksLock protects enqueuedTasks
enqueuedTasksLock sync.Mutex
// cv signals when there are enqueuedTasks
cv *sync.Cond
// enqueuedTasks is list of tasks that must be run now
enqueuedTasks *list.List
}
func (s *manager) Schedule(
task ScheduleTask,
interval Interval,
runAt time.Time,
onlyOnce bool,
) (TaskID, error) {
s.Lock()
defer s.Unlock()
if task == nil {
return TaskNone, fmt.Errorf("Invalid task specified")
}
now := time.Now()
if interval.nextAfter(now).Sub(now) < time.Second {
return TaskNone, fmt.Errorf("Minimum interval is a second")
}
s.currTaskID++
t := &taskInfo{ID: s.currTaskID,
task: task,
interval: interval,
runAt: interval.nextAfter(runAt),
valid: true,
onlyOnce: onlyOnce,
lock: sync.Mutex{},
enqueued: false}
s.tasks.PushBack(t)
return t.ID, nil
}
func (s *manager) Cancel(
taskID TaskID,
) error {
s.Lock()
defer s.Unlock()
for e := s.tasks.Front(); e != nil; e = e.Next() {
t := e.Value.(*taskInfo)
if t.ID == taskID {
t.valid = false
s.tasks.Remove(e)
return nil
}
}
return fmt.Errorf("Invalid task ID: %v", taskID)
}
func (s *manager) Stop() {
s.Lock()
s.ticker.Stop()
s.started = false
s.Unlock()
// Stop running any scheduled tasks.
s.enqueuedTasksLock.Lock()
s.enqueuedTasks.Init()
s.enqueuedTasksLock.Unlock()
}
func (s *manager) Start() {
s.Lock()
defer s.Unlock()
if !s.started {
s.ticker = time.NewTicker(s.minimumInterval)
}
}
func (s *manager) scheduleTasks() {
for {
select {
case <-s.ticker.C:
now := time.Now()
s.Lock()
tasksReady := make([]*taskInfo, 0)
for e := s.tasks.Front(); e != nil; e = e.Next() {
t := e.Value.(*taskInfo)
t.lock.Lock()
if !t.enqueued &&
(now.Equal(t.runAt) || now.After(t.runAt)) {
tasksReady = append(tasksReady, t)
t.enqueued = true
if t.onlyOnce {
s.tasks.Remove(e)
}
}
t.lock.Unlock()
}
s.Unlock()
s.enqueuedTasksLock.Lock()
for _, t := range tasksReady {
s.enqueuedTasks.PushBack(t)
}
s.cv.Broadcast()
s.enqueuedTasksLock.Unlock()
}
}
}
func (s *manager) runTasks() {
for {
s.cv.L.Lock()
if s.enqueuedTasks.Len() == 0 {
s.cv.Wait()
}
var t *taskInfo
if s.enqueuedTasks.Len() > 0 {
t = s.enqueuedTasks.Front().Value.(*taskInfo)
s.enqueuedTasks.Remove(s.enqueuedTasks.Front())
}
s.cv.L.Unlock()
if t != nil && t.valid {
t.task(t.interval)
t.lock.Lock()
t.runAt = t.interval.nextAfter(time.Now())
t.enqueued = false
t.lock.Unlock()
}
}
}
func New(minimumInterval time.Duration) Scheduler {
m := &manager{
tasks: list.New(),
currTaskID: 0,
minimumInterval: minimumInterval,
ticker: time.NewTicker(minimumInterval),
enqueuedTasksLock: sync.Mutex{},
enqueuedTasks: list.New()}
m.cv = sync.NewCond(&m.enqueuedTasksLock)
for i := 0; i < numGoRoutines; i++ {
go m.runTasks()
}
m.Start()
go m.scheduleTasks()
return m
}
func Init(minimumInterval time.Duration) {
dbg.Assert(instance == nil, "Scheduler already initialized")
instance = New(minimumInterval)
}
func Instance() Scheduler {
return instance
}