-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.go
124 lines (107 loc) · 2.37 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package scheduler
import (
"context"
"errors"
"sync"
"time"
"github.com/robfig/cron/v3"
)
type Scheduler struct {
taskMap map[string]cron.EntryID
lock sync.Mutex
cron *cron.Cron
}
func Start() {
scheduler.cron.Start()
}
//TaskList Snapshot
func TaskList() (tasks []string) {
for name := range scheduler.taskMap {
tasks = append(tasks, name)
}
return
}
func RegisterTask(task Task, cfg *TaskConfig) error {
return scheduler.RegisterTask(task, cfg)
}
func RemoveTask(name string) {
scheduler.RemoveTask(name)
}
func Stop() context.Context {
for name := range scheduler.taskMap {
scheduler.RemoveTask(name)
}
return scheduler.cron.Stop()
}
func GetTask(name string) Task {
return scheduler.GetTask(name)
}
func GetTaskStatus(name string) *TaskStatus {
return scheduler.GetTaskStatus(name)
}
func Exist(name string) bool {
return scheduler.Exist(name)
}
var scheduler = newScheduler()
func newScheduler() *Scheduler {
return &Scheduler{
taskMap: make(map[string]cron.EntryID),
cron: cron.New(cron.WithSeconds()),
}
}
func (s *Scheduler) Exist(name string) (exist bool) {
s.lock.Lock()
defer s.lock.Unlock()
_, exist = s.taskMap[name]
return
}
func (s *Scheduler) GetTask(name string) Task {
s.lock.Lock()
defer s.lock.Unlock()
if taskId, OK := s.taskMap[name]; OK {
entry := s.cron.Entry(taskId)
return entry.Job.(Task)
}
return nil
}
func (s *Scheduler) GetTaskStatus(name string) *TaskStatus {
s.lock.Lock()
defer s.lock.Unlock()
if taskId, OK := s.taskMap[name]; OK {
entry := s.cron.Entry(taskId)
return &TaskStatus{
entry.Next,
entry.Prev,
}
}
return nil
}
func (s *Scheduler) RemoveTask(name string) {
s.lock.Lock()
defer s.lock.Unlock()
if taskId, OK := s.taskMap[name]; OK {
s.cron.Remove(taskId)
delete(s.taskMap, name)
}
}
func (s *Scheduler) RegisterTask(task Task, cfg *TaskConfig) (err error) {
s.lock.Lock()
defer s.lock.Unlock()
if _, OK := s.taskMap[task.Name()]; OK {
err = TaskExistsErr
} else if err = cfg.Check(); err == nil {
if err = task.Init(cfg.Params); err == nil {
var taskId cron.EntryID
if cfg.Cron != "" {
taskId, err = s.cron.AddJob(cfg.Cron, task)
} else {
taskId = s.cron.Schedule(cron.Every(time.Second*time.Duration(cfg.Interval)), task)
}
if err == nil {
s.taskMap[task.Name()] = taskId
}
}
}
return
}
var TaskExistsErr = errors.New("task already exists")