-
Notifications
You must be signed in to change notification settings - Fork 0
/
jobs.go
117 lines (98 loc) · 2.79 KB
/
jobs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package shigoto
import (
"errors"
"fmt"
"reflect"
"time"
cronparser "github.com/KodepandaID/shigoto/pkg/cron-parser"
"github.com/KodepandaID/shigoto/pkg/mongodb-connector"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// Jobs instance
type Jobs struct {
client *mongodb.Connector
parser *cronparser.Parser
JobName string
FuncName string
JobParams []interface{}
Cron []string // Set run a jobs with periodic by second, minute and hour
}
// Do to run a schedule command
func (j *Jobs) Do() (id primitive.ObjectID, e error) {
schedule, eFatal := j.parser.SetCurrentTime(time.Now()).Parse(j.Cron)
if eFatal != nil {
panic(eFatal)
}
id, e = j.client.InsertJobCollection(&mongodb.JobCollection{
JobName: j.JobName,
FuncName: j.FuncName,
CronFormat: j.Cron,
})
if id != primitive.NilObjectID && e == nil || id != primitive.NilObjectID && e.Error() == "Jobs is already registered, use the different job name" {
e = nil
j.storedTask(id, schedule)
}
return id, e
}
func CallFunc(funcName string) (e error) {
f := reflect.ValueOf(FuncStorage[funcName])
if !f.IsValid() {
return errors.New("Function invalid, check your function register")
}
values := f.Call([]reflect.Value{})
e = HandleErrFunc(values)
return e
}
func CallFuncWithParams(funcName string, params []interface{}) (e error) {
f := reflect.ValueOf(FuncStorage[funcName])
in := make([]reflect.Value, len(params))
for k, param := range params {
in[k] = reflect.ValueOf(param)
}
values := f.Call(in)
e = HandleErrFunc(values)
return e
}
func HandleErrFunc(values []reflect.Value) (e error) {
for i, val := range values {
if val.Type().String() == "error" && !val.IsNil() {
e = fmt.Errorf("%s", values[i])
}
}
return e
}
func (j *Jobs) storedTask(id primitive.ObjectID, schedule cronparser.Schedule) {
if ScheduleStorage[schedule.Next.String()] == nil {
ScheduleStorage[schedule.Next.String()] = []map[string]interface{}{
{
"id": id.Hex(),
"job_name": j.JobName,
"func_name": j.FuncName,
"params": j.JobParams,
"cron": j.Cron,
},
}
j.client.InsertTask(id, j.JobParams...)
} else {
ss := ScheduleStorage[schedule.Next.String()].([]map[string]interface{})
var sameParams bool
for _, task := range ss {
// check if the task having the same params,
// if they have the same params, it will be ignored.
if reflect.DeepEqual(task["params"].([]interface{}), j.JobParams) && task["job_name"].(string) == j.JobName {
sameParams = true
}
}
if !sameParams {
ss = append(ss, map[string]interface{}{
"id": id.Hex(),
"job_name": j.JobName,
"func_name": j.FuncName,
"params": j.JobParams,
"cron": j.Cron,
})
ScheduleStorage[schedule.Next.String()] = ss
j.client.InsertTask(id, j.JobParams...)
}
}
}