-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.custom.helper.go
270 lines (211 loc) · 8.9 KB
/
task.custom.helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
package gojobs
import (
"context"
"errors"
"fmt"
"github.com/redis/go-redis/v9"
"go.dtapp.net/gojson"
"go.dtapp.net/gorequest"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"log/slog"
"time"
)
type TaskCustomHelper struct {
cfg *taskHelperConfig // 配置
taskType string // [任务]类型
taskList []*TaskCustomHelperTaskList // [任务]列表
Ctx context.Context // [启动]上下文
Span trace.Span // [启动]链路追踪
}
// NewTaskCustomHelper 任务帮助
// rootCtx 链路追踪的上下文
// taskType 任务类型
// logIsDebug 日志是否启动
// traceIsFilter 链路追踪是否过滤
func NewTaskCustomHelper(rootCtx context.Context, taskType string, opts ...TaskHelperOption) *TaskCustomHelper {
th := &TaskCustomHelper{
taskType: taskType,
}
// 配置
th.cfg = newTaskHelperConfig(opts)
if gorequest.GetRequestIDContext(rootCtx) == "" {
rootCtx = gorequest.SetRequestIDContext(rootCtx)
}
// 启动OpenTelemetry链路追踪
th.Ctx, th.Span = NewTraceStartSpan(rootCtx, th.taskType)
th.Span.SetAttributes(attribute.String("task.help.helper", "custom"))
th.Span.SetAttributes(attribute.String("task.help.request_id", gorequest.GetRequestIDContext(th.Ctx)))
th.Span.SetAttributes(attribute.String("task.new.type", th.taskType))
th.Span.SetAttributes(attribute.Bool("task.cfg.logIsDebug", th.cfg.logIsDebug))
th.Span.SetAttributes(attribute.Bool("task.cfg.traceIsFilter", th.cfg.traceIsFilter))
th.Span.SetAttributes(attribute.String("task.cfg.traceIsFilterKeyName", th.cfg.traceIsFilterKeyName))
th.Span.SetAttributes(attribute.String("task.cfg.traceIsFilterKeyValue", th.cfg.traceIsFilterKeyValue))
return th
}
// QueryTaskList 通过回调函数获取任务列表
// rootCtx 链路追踪的上下文
// isRunCallback 任务列表回调函数 返回 是否使用 任务列表
// listCallback 任务回调函数 返回 任务列表
// newTaskLists 新的任务列表
// isContinue 是否继续
func (th *TaskCustomHelper) QueryTaskList(rootCtx context.Context, isRunCallback func(ctx context.Context, keyName string) (isUse bool, result *redis.StringCmd), listCallback func(ctx context.Context, taskType string) []*TaskCustomHelperTaskList) (isContinue bool) {
// 启动OpenTelemetry链路追踪
ctx, span := NewTraceStartSpan(rootCtx, "QueryTaskList")
span.SetAttributes(attribute.String("task.help.helper", "custom"))
span.SetAttributes(attribute.String("task.help.request_id", gorequest.GetRequestIDContext(ctx)))
// 任务列表回调函数
if isRunCallback != nil {
// 执行
isRunUse, isRunResult := isRunCallback(ctx, GetRedisKeyName(th.taskType))
if isRunUse {
if isRunResult.Err() != nil {
if errors.Is(isRunResult.Err(), redis.Nil) {
err := fmt.Errorf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error())
span.SetStatus(codes.Error, err.Error())
if th.cfg.logIsDebug {
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error()))
}
// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
err := fmt.Errorf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error())
span.SetStatus(codes.Error, err.Error())
if th.cfg.logIsDebug {
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error()))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
if isRunResult.Val() == "" {
err := fmt.Errorf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val())
span.SetStatus(codes.Error, err.Error())
if th.cfg.logIsDebug {
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val()))
}
// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
}
}
// 执行任务列表回调函数
if listCallback != nil {
th.taskList = listCallback(ctx, th.taskType)
}
// 没有任务需要执行
if len(th.taskList) <= 0 {
if th.cfg.logIsDebug {
slog.InfoContext(ctx, "QueryTaskList 没有任务需要执行")
}
// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
// OpenTelemetry链路追踪
span.SetAttributes(attribute.String("task.list.list", gojson.JsonEncodeNoError(th.taskList)))
span.SetAttributes(attribute.Int("task.list.count", len(th.taskList)))
span.End() // 结束OpenTelemetry链路追踪
return true
}
// GetTaskList 请在QueryTaskList之后获取任务列表
func (th *TaskCustomHelper) GetTaskList() []*TaskCustomHelperTaskList {
return th.taskList
}
// RunMultipleTask 运行多个任务
// rootCtx 链路追踪的上下文
// wait 等待时间(秒)
// executionCallback 执行任务回调函数
// startCallback 开始任务回调函数
// endCallback 结束任务回调函数
func (th *TaskCustomHelper) RunMultipleTask(rootCtx context.Context, wait int64, executionCallback func(ctx context.Context, task *TaskCustomHelperTaskList) (err error), startCallback func(ctx context.Context, taskType string) (err error), endCallback func(ctx context.Context, taskType string)) {
// 启动OpenTelemetry链路追踪
ctx, span := NewTraceStartSpan(rootCtx, "RunMultipleTask")
span.SetAttributes(attribute.String("task.help.helper", "custom"))
span.SetAttributes(attribute.String("task.help.request_id", gorequest.GetRequestIDContext(ctx)))
span.SetAttributes(attribute.Int64("task.multiple.wait", wait))
span.SetAttributes(attribute.String("task.multiple.list", gojson.JsonEncodeNoError(th.taskList)))
span.SetAttributes(attribute.Int("task.multiple.count", len(th.taskList)))
if th.cfg.logIsDebug {
slog.DebugContext(ctx, "RunMultipleTask 运行多个任务", slog.Int64("wait", wait))
}
// 执行开始任务回调函数
if startCallback != nil && endCallback != nil {
err := startCallback(ctx, th.taskType)
if err != nil {
err = fmt.Errorf("开始任务回调函数返回错误,无法继续运行: %s", err)
span.SetStatus(codes.Error, err.Error())
if th.cfg.logIsDebug {
slog.DebugContext(ctx, fmt.Sprintf("开始任务回调函数返回错误,无法继续运行: %s", err))
}
// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
}
for _, vTask := range th.taskList {
// 运行单个任务
th.RunSingleTask(ctx, vTask, executionCallback)
// 等待 wait 秒
if wait > 0 {
time.Sleep(time.Duration(wait) * time.Second)
}
}
// 执行结束任务回调函数
if startCallback != nil && endCallback != nil {
endCallback(ctx, th.taskType)
}
span.End() // 结束OpenTelemetry链路追踪
return
}
// RunSingleTask 运行单个任务
// rootCtx 链路追踪的上下文
// task 任务
// executionCallback 执行任务回调函数
func (th *TaskCustomHelper) RunSingleTask(rootCtx context.Context, task *TaskCustomHelperTaskList, executionCallback func(ctx context.Context, task *TaskCustomHelperTaskList) (err error)) {
// 启动OpenTelemetry链路追踪
ctx, span := NewTraceStartSpan(rootCtx, "RunSingleTask "+task.CustomID)
span.SetAttributes(attribute.String("task.help.helper", "custom"))
span.SetAttributes(attribute.String("task.help.request_id", gorequest.GetRequestIDContext(ctx)))
span.SetAttributes(attribute.String("task.single.info", gojson.JsonEncodeNoError(task)))
if th.cfg.logIsDebug {
slog.DebugContext(ctx, "RunSingleTask 运行单个任务", slog.String("task", gojson.JsonEncodeNoError(task)))
}
// 任务回调函数
if executionCallback != nil {
// 执行
err := executionCallback(ctx, task)
if err != nil {
span.RecordError(err, trace.WithStackTrace(true))
span.SetStatus(codes.Error, err.Error())
}
// OpenTelemetry链路追踪
span.SetAttributes(attribute.String("task.info.id", task.TaskID))
span.SetAttributes(attribute.String("task.info.name", task.TaskName))
span.SetAttributes(attribute.String("task.info.params", task.TaskParams))
span.SetAttributes(attribute.String("task.info.custom_id", task.CustomID))
}
span.End() // 结束OpenTelemetry链路追踪
return
}
// EndRunTaskList 结束运行任务列表并停止OpenTelemetry链路追踪
func (th *TaskCustomHelper) EndRunTaskList() {
if th.Span != nil {
th.Span.End()
}
return
}