-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.helper.go
431 lines (350 loc) · 13.9 KB
/
task.helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
package gojobs
import (
"context"
"errors"
"fmt"
"github.com/redis/go-redis/v9"
"go.dtapp.net/gojson"
"go.dtapp.net/gorequest"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"log/slog"
"strings"
"time"
)
type TaskHelper struct {
cfg *taskHelperConfig // 配置
taskType string // [任务]类型
taskList []*GormModelTask // [任务]列表
Ctx context.Context // [启动]上下文
Span trace.Span // [启动]链路追踪
}
// NewTaskHelper 任务帮助
// ctx 链路追踪的上下文
// taskType 任务类型
// logIsDebug 日志是否启动
// traceIsFilter 链路追踪是否过滤
func NewTaskHelper(rootCtx context.Context, taskType string, opts ...TaskHelperOption) *TaskHelper {
th := &TaskHelper{
taskType: taskType,
}
// 配置
th.cfg = newTaskHelperConfig(opts)
if gorequest.GetRequestIDContext(rootCtx) == "" {
rootCtx = gorequest.SetRequestIDContext(rootCtx)
}
// 启动OpenTelemetry链路追踪
th.Ctx, th.Span = NewTraceStartSpan(rootCtx, th.taskType)
th.Span.SetAttributes(attribute.String("task.help.helper", "db"))
th.Span.SetAttributes(attribute.String("task.help.request_id", gorequest.GetRequestIDContext(th.Ctx)))
th.Span.SetAttributes(attribute.String("task.new.type", th.taskType))
th.Span.SetAttributes(attribute.Bool("task.cfg.logIsDebug", th.cfg.logIsDebug))
th.Span.SetAttributes(attribute.Bool("task.cfg.traceIsFilter", th.cfg.traceIsFilter))
th.Span.SetAttributes(attribute.String("task.cfg.traceIsFilterKeyName", th.cfg.traceIsFilterKeyName))
th.Span.SetAttributes(attribute.String("task.cfg.traceIsFilterKeyValue", th.cfg.traceIsFilterKeyValue))
return th
}
// QueryTaskList 通过回调函数获取任务列表
// rootCtx 链路追踪的上下文
// isRunCallback 任务列表回调函数 返回 是否使用 任务列表
// listCallback 任务回调函数 返回 任务列表
// newTaskLists 新的任务列表
// isContinue 是否继续
func (th *TaskHelper) QueryTaskList(rootCtx context.Context, isRunCallback func(ctx context.Context, keyName string) (isUse bool, result *redis.StringCmd), listCallback func(ctx context.Context, taskType string) []*GormModelTask) (isContinue bool) {
// 启动OpenTelemetry链路追踪
ctx, span := NewTraceStartSpan(rootCtx, "QueryTaskList")
// 任务列表回调函数
if isRunCallback != nil {
// 执行
isRunUse, isRunResult := isRunCallback(ctx, GetRedisKeyName(th.taskType))
if isRunUse {
if isRunResult.Err() != nil {
if errors.Is(isRunResult.Err(), redis.Nil) {
err := fmt.Errorf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error())
//th.listSpan.RecordError( err, trace.WithStackTrace(true))
span.SetStatus(codes.Error, err.Error())
if th.cfg.logIsDebug {
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error()))
}
// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
err := fmt.Errorf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error())
//th.listSpan.RecordError( err, trace.WithStackTrace(true))
span.SetStatus(codes.Error, err.Error())
if th.cfg.logIsDebug {
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error()))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
if isRunResult.Val() == "" {
err := fmt.Errorf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val())
//th.listSpan.RecordError(err, trace.WithStackTrace(true))
span.SetStatus(codes.Error, err.Error())
if th.cfg.logIsDebug {
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val()))
}
// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
}
}
// 任务列表回调函数
if listCallback != nil {
// 执行
taskLists := listCallback(ctx, th.taskType)
// 判断任务类型是否一致
for _, vTask := range taskLists {
if vTask.Type == th.taskType {
th.taskList = append(th.taskList, vTask)
}
}
}
// 没有任务需要执行
if len(th.taskList) <= 0 {
if th.cfg.logIsDebug {
slog.InfoContext(ctx, "QueryTaskList 没有任务需要执行")
}
// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
// OpenTelemetry链路追踪
span.SetAttributes(attribute.String("task.list.list", gojson.JsonEncodeNoError(th.taskList)))
span.SetAttributes(attribute.Int("task.list.count", len(th.taskList)))
span.End() // 结束OpenTelemetry链路追踪
return true
}
// FilterTaskList 过滤任务列表
// rootCtx 链路追踪的上下文
// isMandatoryIp 强制当前ip
// specifyIp 指定Ip
// isContinue 是否继续
func (th *TaskHelper) FilterTaskList(rootCtx context.Context, isMandatoryIp bool, specifyIp string) (isContinue bool) {
// 启动OpenTelemetry链路追踪
ctx, span := NewTraceStartSpan(rootCtx, "FilterTaskList")
span.SetAttributes(attribute.String("task.help.helper", "db"))
span.SetAttributes(attribute.String("task.help.request_id", gorequest.GetRequestIDContext(ctx)))
if th.cfg.logIsDebug {
slog.DebugContext(ctx, "FilterTaskList 过滤任务列表", slog.Bool("isMandatoryIp", isMandatoryIp), slog.String("specifyIp", specifyIp))
}
if specifyIp != "" {
// 新的任务列表
var newTaskLists []*GormModelTask
// 解析指定IP
specifyIp = gorequest.IpIs(specifyIp)
// 循环判断 过滤指定IP
for _, vTask := range th.taskList {
vTask.SpecifyIP = gorequest.IpIs(vTask.SpecifyIP)
// 强制只能是当前的IP
if isMandatoryIp {
// 进入强制性IP
if vTask.SpecifyIP == specifyIp {
// 进入强制性IP,可添加任务
newTaskLists = append(newTaskLists, vTask)
continue
}
}
if vTask.SpecifyIP == "" {
// 任务指定IP为空,可添加任务
newTaskLists = append(newTaskLists, vTask)
continue
} else if vTask.SpecifyIP == SpecifyIpNull {
// 任务指定Ip无限制,可添加任务
newTaskLists = append(newTaskLists, vTask)
continue
} else {
// 判断是否包含该IP
specifyIpFind := strings.Contains(vTask.SpecifyIP, ",")
if specifyIpFind {
// 进入强制性多IP
// 分割字符串
parts := strings.Split(vTask.SpecifyIP, ",")
for _, vv := range parts {
if vv == specifyIp {
// 进入强制性多IP,可添加任务
newTaskLists = append(newTaskLists, vTask)
continue
}
}
} else {
// 进入强制性单IP
if vTask.SpecifyIP == specifyIp {
// 进入强制性单IP,可添加任务
newTaskLists = append(newTaskLists, vTask)
continue
}
}
}
}
// 设置任务列表
th.taskList = newTaskLists
}
// 没有任务需要执行
if len(th.taskList) <= 0 {
if th.cfg.logIsDebug {
slog.InfoContext(ctx, "FilterTaskList 没有任务需要执行")
}
// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
// OpenTelemetry链路追踪
span.SetAttributes(attribute.String("task.filter.list", gojson.JsonEncodeNoError(th.taskList)))
span.SetAttributes(attribute.Int("task.filter.count", len(th.taskList)))
span.End() // 结束OpenTelemetry链路追踪
return true
}
// GetTaskList 请在FilterTaskList之后获取任务列表
func (th *TaskHelper) GetTaskList() []*GormModelTask {
return th.taskList
}
// RunMultipleTask 运行多个任务
// rootCtx 链路追踪的上下文
// wait 等待时间(秒)
// executionCallback 执行任务回调函数 返回 runCode=状态 runDesc=描述
// updateCallback 执行更新回调函数
// startCallback 开始任务回调函数
// endCallback 结束任务回调函数
func (th *TaskHelper) RunMultipleTask(rootCtx context.Context, wait int64, executionCallback func(ctx context.Context, task *GormModelTask) (runCode int, runDesc string), updateCallback func(ctx context.Context, task *GormModelTask, result *TaskHelperRunSingleTaskResponse), startCallback func(ctx context.Context, taskType string) (err error), endCallback func(ctx context.Context, taskType string)) {
// 启动OpenTelemetry链路追踪
ctx, span := NewTraceStartSpan(rootCtx, "RunMultipleTask")
span.SetAttributes(attribute.String("task.help.helper", "db"))
span.SetAttributes(attribute.String("task.help.request_id", gorequest.GetRequestIDContext(ctx)))
span.SetAttributes(attribute.Int64("task.multiple.wait", wait))
span.SetAttributes(attribute.String("task.multiple.list", gojson.JsonEncodeNoError(th.taskList)))
span.SetAttributes(attribute.Int("task.multiple.count", len(th.taskList)))
if th.cfg.logIsDebug {
slog.DebugContext(ctx, "RunMultipleTask 运行多个任务", slog.Int64("wait", wait))
}
// 执行开始任务回调函数
if startCallback != nil && endCallback != nil {
err := startCallback(ctx, th.taskType)
if err != nil {
err = fmt.Errorf("开始任务回调函数返回错误,无法继续运行: %s", err)
span.SetStatus(codes.Error, err.Error())
if th.cfg.logIsDebug {
slog.DebugContext(ctx, fmt.Sprintf("开始任务回调函数返回错误,无法继续运行: %s", err))
}
// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
}
for _, vTask := range th.taskList {
// 运行单个任务
th.RunSingleTask(ctx, vTask, executionCallback, updateCallback)
// 等待 wait 秒
if wait > 0 {
time.Sleep(time.Duration(wait) * time.Second)
}
}
// 执行结束任务回调函数
if startCallback != nil && endCallback != nil {
endCallback(ctx, th.taskType)
}
span.End() // 结束OpenTelemetry链路追踪
return
}
type TaskHelperRunSingleTaskResponse struct {
RunID string // 运行编号
RunCode int // 运行状态
RunDesc string // 运行描述
TraceID string // 追踪编号
SpanID string // 跨度编号
RequestID string // 请求编号
}
// RunSingleTask 运行单个任务
// rootCtx 链路追踪的上下文
// task 任务
// executionCallback 执行任务回调函数 返回 runCode=状态 runDesc=描述
// updateCallback 执行更新回调函数
func (th *TaskHelper) RunSingleTask(rootCtx context.Context, task *GormModelTask, executionCallback func(ctx context.Context, task *GormModelTask) (runCode int, runDesc string), updateCallback func(ctx context.Context, task *GormModelTask, result *TaskHelperRunSingleTaskResponse)) {
// 启动OpenTelemetry链路追踪
ctx, span := NewTraceStartSpan(rootCtx, "RunSingleTask "+task.CustomID)
span.SetAttributes(attribute.String("task.help.helper", "db"))
span.SetAttributes(attribute.String("task.help.request_id", gorequest.GetRequestIDContext(ctx)))
span.SetAttributes(attribute.String("task.single.info", gojson.JsonEncodeNoError(task)))
if th.cfg.logIsDebug {
slog.DebugContext(ctx, "RunSingleTask 运行单个任务", slog.String("task", gojson.JsonEncodeNoError(task)))
}
// 任务回调函数
if executionCallback != nil {
// 需要返回的结构
result := TaskHelperRunSingleTaskResponse{
TraceID: span.SpanContext().TraceID().String(),
SpanID: span.SpanContext().SpanID().String(),
RequestID: gorequest.GetRequestIDContext(ctx),
}
// 执行
result.RunCode, result.RunDesc = executionCallback(ctx, task)
if result.RunCode == CodeAbnormal {
span.SetStatus(codes.Error, result.RunDesc)
}
if result.RunCode == CodeSuccess {
span.SetStatus(codes.Ok, result.RunDesc)
}
if result.RunCode == CodeError {
span.RecordError(fmt.Errorf(result.RunDesc), trace.WithStackTrace(true))
span.SetStatus(codes.Error, result.RunDesc)
}
// 运行编号
result.RunID = result.TraceID
if result.RunID == "" {
result.RunID = result.RequestID
if result.RunID == "" {
span.RecordError(fmt.Errorf("上下文没有运行编号"), trace.WithStackTrace(true))
span.SetStatus(codes.Error, "上下文没有运行编号")
if th.cfg.logIsDebug {
slog.ErrorContext(ctx, "RunSingleTask 上下文没有运行编号")
}
span.End() // 结束OpenTelemetry链路追踪
return
}
}
// OpenTelemetry链路追踪
span.SetAttributes(attribute.Int64("task.info.id", int64(task.ID)))
span.SetAttributes(attribute.String("task.info.status", task.Status))
span.SetAttributes(attribute.String("task.info.params", task.Params))
span.SetAttributes(attribute.Int64("task.info.number", task.Number))
span.SetAttributes(attribute.Int64("task.info.max_number", task.MaxNumber))
span.SetAttributes(attribute.String("task.info.custom_id", task.CustomID))
span.SetAttributes(attribute.Int64("task.info.custom_sequence", task.CustomSequence))
span.SetAttributes(attribute.String("task.info.type", task.Type))
span.SetAttributes(attribute.String("task.run.id", result.RunID))
span.SetAttributes(attribute.Int("task.run.code", result.RunCode))
span.SetAttributes(attribute.String("task.run.desc", result.RunDesc))
// 执行更新回调函数
if updateCallback != nil {
updateCallback(ctx, task, &result)
}
}
span.End() // 结束OpenTelemetry链路追踪
return
}
// EndRunTaskList 结束运行任务列表并停止OpenTelemetry链路追踪
func (th *TaskHelper) EndRunTaskList() {
if th.Span != nil {
th.Span.End()
}
return
}