/
run.go
174 lines (167 loc) · 5.31 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package gojobs
import (
"context"
"fmt"
"go.dtapp.net/goip"
"go.dtapp.net/gotime"
"go.dtapp.net/gotrace_id"
"log/slog"
"strings"
)
// Filter 过滤
// ctx 上下文
// isMandatoryIp 强制当前ip
// specifyIp 指定Ip
// tasks 过滤前的数据
// newTasks 过滤后的数据
func (c *Client) Filter(ctx context.Context, isMandatoryIp bool, specifyIp string, tasks []GormModelTask, isPrint bool) (newTasks []GormModelTask) {
c.Println(ctx, isPrint, fmt.Sprintf("【Filter入参】是强制性Ip:%v;指定Ip:%v;任务数量:%v", isMandatoryIp, specifyIp, len(tasks)))
if specifyIp == "" {
specifyIp = goip.IsIp(c.GetCurrentIp())
} else {
specifyIp = goip.IsIp(specifyIp)
}
c.Println(ctx, isPrint, fmt.Sprintf("【Filter入参】指定Ip重新解析:%v", specifyIp))
for _, v := range tasks {
c.Println(ctx, isPrint, fmt.Sprintf("【Filter入参】任务指定Ip解析前:%v", v.SpecifyIP))
v.SpecifyIP = goip.IsIp(v.SpecifyIP)
c.Println(ctx, isPrint, fmt.Sprintf("【Filter入参】任务指定Ip重新解析:%v", v.SpecifyIP))
// 强制只能是当前的ip
if isMandatoryIp {
c.Println(ctx, isPrint, "【Filter入参】进入强制性Ip")
if v.SpecifyIP == specifyIp {
c.Println(ctx, isPrint, fmt.Sprintf("【Filter入参】进入强制性Ip 添加任务:%v", v.ID))
newTasks = append(newTasks, v)
continue
}
}
if v.SpecifyIP == "" {
c.Println(ctx, isPrint, fmt.Sprintf("【Filter入参】任务指定Ip为空 添加任务:%v", v.ID))
newTasks = append(newTasks, v)
continue
} else if v.SpecifyIP == SpecifyIpNull {
c.Println(ctx, isPrint, fmt.Sprintf("【Filter入参】任务指定Ip无限制 添加任务:%v", v.ID))
newTasks = append(newTasks, v)
continue
} else {
// 判断是否包含该ip
specifyIpFind := strings.Contains(v.SpecifyIP, ",")
if specifyIpFind {
c.Println(ctx, isPrint, "【Filter入参】进入强制性多Ip")
// 分割字符串
parts := strings.Split(v.SpecifyIP, ",")
for _, vv := range parts {
if vv == specifyIp {
c.Println(ctx, isPrint, fmt.Sprintf("【Filter入参】进入强制性多Ip 添加任务:%v", v.ID))
newTasks = append(newTasks, v)
continue
}
}
} else {
c.Println(ctx, isPrint, "【Filter入参】进入强制性单Ip")
if v.SpecifyIP == specifyIp {
newTasks = append(newTasks, v)
c.Println(ctx, isPrint, fmt.Sprintf("【Filter入参】进入强制性单Ip 添加任务:%v", v.ID))
continue
}
}
}
}
return newTasks
}
// Run 运行
func (c *Client) Run(ctx context.Context, task GormModelTask, taskResultCode int, taskResultDesc string) {
runId := gotrace_id.GetTraceIdContext(ctx)
if runId == "" {
if c.slog.status {
slog.InfoContext(ctx, "上下文没有跟踪编号")
}
return
}
if c.gormConfig.taskLogStatus {
go c.GormTaskLogRecord(ctx, task, runId, taskResultCode, taskResultDesc)
}
switch taskResultCode {
case 0:
err := c.EditTask(ctx, c.gormConfig.client, task.ID).
Select("run_id", "result", "next_run_time").
Updates(GormModelTask{
RunID: runId,
Result: taskResultDesc,
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
if c.slog.status {
slog.InfoContext(ctx, fmt.Sprintf("保存失败:%s", err))
}
}
return
case CodeSuccess:
// 执行成功
err := c.EditTask(ctx, c.gormConfig.client, task.ID).
Select("status_desc", "number", "run_id", "updated_ip", "result", "next_run_time").
Updates(GormModelTask{
StatusDesc: "执行成功",
Number: task.Number + 1,
RunID: runId,
UpdatedIP: c.config.systemOutsideIP,
Result: taskResultDesc,
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
if c.slog.status {
slog.InfoContext(ctx, fmt.Sprintf("保存失败:%s", err))
}
}
case CodeEnd:
// 执行成功、提前结束
err := c.EditTask(ctx, c.gormConfig.client, task.ID).
Select("status", "status_desc", "number", "updated_ip", "result", "next_run_time").
Updates(GormModelTask{
Status: TASK_SUCCESS,
StatusDesc: "结束执行",
Number: task.Number + 1,
UpdatedIP: c.config.systemOutsideIP,
Result: taskResultDesc,
NextRunTime: gotime.Current().Time,
}).Error
if err != nil {
if c.slog.status {
slog.InfoContext(ctx, fmt.Sprintf("保存失败:%s", err))
}
}
case CodeError:
// 执行失败
err := c.EditTask(ctx, c.gormConfig.client, task.ID).
Select("status_desc", "number", "run_id", "updated_ip", "result", "next_run_time").
Updates(GormModelTask{
StatusDesc: "执行失败",
Number: task.Number + 1,
RunID: runId,
UpdatedIP: c.config.systemOutsideIP,
Result: taskResultDesc,
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
if c.slog.status {
slog.InfoContext(ctx, fmt.Sprintf("保存失败:%s", err))
}
}
}
if task.MaxNumber != 0 {
if task.Number+1 >= task.MaxNumber {
// 关闭执行
err := c.EditTask(ctx, c.gormConfig.client, task.ID).
Select("status").
Updates(GormModelTask{
Status: TASK_TIMEOUT,
}).Error
if err != nil {
if c.slog.status {
slog.InfoContext(ctx, fmt.Sprintf("保存失败:%s", err))
}
}
}
}
return
}