diff --git a/README.md b/README.md index 9d69b45..4926b60 100644 --- a/README.md +++ b/README.md @@ -13,5 +13,5 @@ #### 安装 ```shell -go get -v -u go.dtapp.net/gojobs@v1.0.149 +go get -v -u go.dtapp.net/gojobs@v1.0.150 ``` \ No newline at end of file diff --git a/go.mod b/go.mod index de47ea1..566a59d 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/redis/go-redis/v9 v9.5.3 github.com/robfig/cron/v3 v3.0.1 go.dtapp.net/gojson v1.0.4 - go.dtapp.net/gorequest v1.0.68 + go.dtapp.net/gorequest v1.0.70 go.dtapp.net/gostring v1.0.15 go.dtapp.net/gotime v1.0.11 go.opentelemetry.io/otel v1.27.0 diff --git a/go.sum b/go.sum index b2c9dca..68a4714 100644 --- a/go.sum +++ b/go.sum @@ -72,8 +72,8 @@ go.dtapp.net/gojson v1.0.4 h1:9en9iyOOLWoEIo2eKhqt3/Djh/3HhwsTpXxgI9efPRo= go.dtapp.net/gojson v1.0.4/go.mod h1:G9CMVzNSRkbNzIic/vJqHCOyKtw6BW2YM8Vyn64zfM0= go.dtapp.net/gorandom v1.0.3 h1:6RNDFMJfLlHFR98c2tbecaMzg4vEIHyEpduMBUdd0jc= go.dtapp.net/gorandom v1.0.3/go.mod h1:Qd6ywCSrk7sCkh9OdR0wygcbQBFt53WDIrx8rqGBvis= -go.dtapp.net/gorequest v1.0.68 h1:KFH5mKJSs3gthbJj+IxDXn54qabPY4pBE6rqNjfvdyg= -go.dtapp.net/gorequest v1.0.68/go.mod h1:buYy18+d4jO3DbTl88BXyagilw5unU5FFDq5VfCehEg= +go.dtapp.net/gorequest v1.0.70 h1:1yxWhhBpxaSy1cCzyuAFYvoj5FJHam52dnkKs4aUIHg= +go.dtapp.net/gorequest v1.0.70/go.mod h1:buYy18+d4jO3DbTl88BXyagilw5unU5FFDq5VfCehEg= go.dtapp.net/gostring v1.0.15 h1:MxvbgsBHSstIUdtwHf1FGtfh1bPbAlxxC+9NFVk396M= go.dtapp.net/gostring v1.0.15/go.mod h1:AJLixiPhpBZrSKQ7yW46a+42iEIrOlfkJJ+Hta/j8RE= go.dtapp.net/gotime v1.0.11 h1:VAmi1kFhkwJweIujeUOWf16wSmAZd4VyhyD25ku/J2Y= diff --git a/task.custom.helper.go b/task.custom.helper.go index 47d7d34..10e2c2b 100644 --- a/task.custom.helper.go +++ b/task.custom.helper.go @@ -25,7 +25,7 @@ type TaskCustomHelper struct { } // NewTaskCustomHelper 任务帮助 -// ctx 链路追踪的上下文 +// rootCtx 链路追踪的上下文 // taskType 任务类型 // logIsDebug 日志是否启动 // traceIsFilter 链路追踪是否过滤 @@ -58,6 +58,7 @@ func NewTaskCustomHelper(rootCtx context.Context, taskType string, opts ...TaskH } // QueryTaskList 通过回调函数获取任务列表 +// rootCtx 链路追踪的上下文 // isRunCallback 任务列表回调函数 返回 是否使用 任务列表 // listCallback 任务回调函数 返回 任务列表 // newTaskLists 新的任务列表 @@ -79,11 +80,11 @@ func (th *TaskCustomHelper) QueryTaskList(rootCtx context.Context, isRunCallback if isRunUse { if isRunResult.Err() != nil { if errors.Is(isRunResult.Err(), redis.Nil) { - err := fmt.Errorf("查询redis的key不存在,根据设置,无法继续运行: %v", isRunResult.Err().Error()) + err := fmt.Errorf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error()) span.SetStatus(codes.Error, err.Error()) if th.cfg.logIsDebug { - slog.DebugContext(ctx, "查询redis的key不存在,根据设置,无法继续运行", slog.String("key", GetRedisKeyName(th.taskType)), slog.String("err", isRunResult.Err().Error())) + slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error())) } // 过滤 @@ -93,20 +94,20 @@ func (th *TaskCustomHelper) QueryTaskList(rootCtx context.Context, isRunCallback return } - err := fmt.Errorf("查询redis的key异常,无法继续运行: %v", isRunResult.Err().Error()) + err := fmt.Errorf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error()) span.SetStatus(codes.Error, err.Error()) if th.cfg.logIsDebug { - slog.DebugContext(ctx, "QueryTaskList 查询redis的key异常,无法继续运行", slog.String("err", isRunResult.Err().Error())) + slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error())) } return } if isRunResult.Val() == "" { - err := fmt.Errorf("查询redis的key内容为空,根据配置,无法继续运行: %s", isRunResult.Val()) + err := fmt.Errorf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val()) span.SetStatus(codes.Error, err.Error()) if th.cfg.logIsDebug { - slog.DebugContext(ctx, "QueryTaskList 查询redis的key内容为空,根据配置,无法继续运行", slog.String("val", isRunResult.Val())) + slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val())) } // 过滤 @@ -150,8 +151,12 @@ func (th *TaskCustomHelper) GetTaskList() []TaskCustomHelperTaskList { } // RunMultipleTask 运行多个任务 +// rootCtx 链路追踪的上下文 +// wait 等待时间(秒) // executionCallback 执行任务回调函数 -func (th *TaskCustomHelper) RunMultipleTask(rootCtx context.Context, wait int64, executionCallback func(ctx context.Context, task TaskCustomHelperTaskList) (err error)) { +// startCallback 开始任务回调函数 +// endCallback 结束任务回调函数 +func (th *TaskCustomHelper) RunMultipleTask(rootCtx context.Context, wait int64, executionCallback func(ctx context.Context, task TaskCustomHelperTaskList) (err error), startCallback func(ctx context.Context, taskType string) (err error), endCallback func(ctx context.Context, taskType string)) { // 启动OpenTelemetry链路追踪 ctx, span := NewTraceStartSpan(rootCtx, "RunMultipleTask") @@ -168,6 +173,28 @@ func (th *TaskCustomHelper) RunMultipleTask(rootCtx context.Context, wait int64, slog.DebugContext(ctx, "RunMultipleTask 运行多个任务", slog.Int64("wait", wait)) } + if startCallback != nil && endCallback != nil { + + // 开始任务回调函数 + err := startCallback(ctx, th.taskType) + if err != nil { + err = fmt.Errorf("开始任务回调函数返回错误,无法继续运行: %s", err) + span.SetStatus(codes.Error, err.Error()) + + if th.cfg.logIsDebug { + slog.DebugContext(ctx, fmt.Sprintf("开始任务回调函数返回错误,无法继续运行: %s", err)) + } + + // 过滤 + if th.cfg.traceIsFilter { + span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue)) + } + return + } + defer endCallback(ctx, th.taskType) + + } + for _, vTask := range th.taskList { // 运行单个任务 @@ -184,6 +211,7 @@ func (th *TaskCustomHelper) RunMultipleTask(rootCtx context.Context, wait int64, } // RunSingleTask 运行单个任务 +// rootCtx 链路追踪的上下文 // task 任务 // executionCallback 执行任务回调函数 func (th *TaskCustomHelper) RunSingleTask(rootCtx context.Context, task TaskCustomHelperTaskList, executionCallback func(ctx context.Context, task TaskCustomHelperTaskList) (err error)) { diff --git a/task.helper.go b/task.helper.go index 138727a..85a4889 100644 --- a/task.helper.go +++ b/task.helper.go @@ -59,6 +59,7 @@ func NewTaskHelper(rootCtx context.Context, taskType string, opts ...TaskHelperO } // QueryTaskList 通过回调函数获取任务列表 +// rootCtx 链路追踪的上下文 // isRunCallback 任务列表回调函数 返回 是否使用 任务列表 // listCallback 任务回调函数 返回 任务列表 // newTaskLists 新的任务列表 @@ -77,12 +78,12 @@ func (th *TaskHelper) QueryTaskList(rootCtx context.Context, isRunCallback func( if isRunUse { if isRunResult.Err() != nil { if errors.Is(isRunResult.Err(), redis.Nil) { - err := fmt.Errorf("查询redis的key不存在,根据设置,无法继续运行: %v", isRunResult.Err().Error()) + err := fmt.Errorf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error()) //th.listSpan.RecordError( err, trace.WithStackTrace(true)) span.SetStatus(codes.Error, err.Error()) if th.cfg.logIsDebug { - slog.DebugContext(ctx, "查询redis的key不存在,根据设置,无法继续运行", slog.String("key", GetRedisKeyName(th.taskType)), slog.String("err", isRunResult.Err().Error())) + slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error())) } // 过滤 @@ -92,30 +93,28 @@ func (th *TaskHelper) QueryTaskList(rootCtx context.Context, isRunCallback func( return } - err := fmt.Errorf("查询redis的key异常,无法继续运行: %v", isRunResult.Err().Error()) + err := fmt.Errorf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error()) //th.listSpan.RecordError( err, trace.WithStackTrace(true)) span.SetStatus(codes.Error, err.Error()) if th.cfg.logIsDebug { - slog.DebugContext(ctx, "QueryTaskList 查询redis的key异常,无法继续运行", slog.String("err", isRunResult.Err().Error())) + slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error())) } - return } if isRunResult.Val() == "" { - err := fmt.Errorf("查询redis的key内容为空,根据配置,无法继续运行: %s", isRunResult.Val()) + err := fmt.Errorf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val()) //th.listSpan.RecordError(err, trace.WithStackTrace(true)) span.SetStatus(codes.Error, err.Error()) if th.cfg.logIsDebug { - slog.DebugContext(ctx, "QueryTaskList 查询redis的key内容为空,根据配置,无法继续运行", slog.String("val", isRunResult.Val())) + slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val())) } // 过滤 if th.cfg.traceIsFilter { span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue)) } - return } } @@ -155,6 +154,7 @@ func (th *TaskHelper) QueryTaskList(rootCtx context.Context, isRunCallback func( } // FilterTaskList 过滤任务列表 +// rootCtx 链路追踪的上下文 // isMandatoryIp 强制当前ip // specifyIp 指定Ip // isContinue 是否继续 @@ -241,7 +241,6 @@ func (th *TaskHelper) FilterTaskList(rootCtx context.Context, isMandatoryIp bool if th.cfg.traceIsFilter { span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue)) } - return } @@ -253,14 +252,18 @@ func (th *TaskHelper) FilterTaskList(rootCtx context.Context, isMandatoryIp bool } // GetTaskList 请在FilterTaskList之后获取任务列表 -func (th *TaskHelper) GetTaskList(rootCtx context.Context) []GormModelTask { +func (th *TaskHelper) GetTaskList() []GormModelTask { return th.taskList } // RunMultipleTask 运行多个任务 +// rootCtx 链路追踪的上下文 +// wait 等待时间(秒) // executionCallback 执行任务回调函数 返回 runCode=状态 runDesc=描述 // updateCallback 执行更新回调函数 -func (th *TaskHelper) RunMultipleTask(rootCtx context.Context, wait int64, executionCallback func(ctx context.Context, task GormModelTask) (runCode int, runDesc string), updateCallback func(ctx context.Context, task GormModelTask, result TaskHelperRunSingleTaskResponse)) { +// startCallback 开始任务回调函数 +// endCallback 结束任务回调函数 +func (th *TaskHelper) RunMultipleTask(rootCtx context.Context, wait int64, executionCallback func(ctx context.Context, task GormModelTask) (runCode int, runDesc string), updateCallback func(ctx context.Context, task GormModelTask, result TaskHelperRunSingleTaskResponse), startCallback func(ctx context.Context, taskType string) (err error), endCallback func(ctx context.Context, taskType string)) { // 启动OpenTelemetry链路追踪 ctx, span := NewTraceStartSpan(rootCtx, "RunMultipleTask") @@ -277,6 +280,28 @@ func (th *TaskHelper) RunMultipleTask(rootCtx context.Context, wait int64, execu slog.DebugContext(ctx, "RunMultipleTask 运行多个任务", slog.Int64("wait", wait)) } + if startCallback != nil && endCallback != nil { + + // 开始任务回调函数 + err := startCallback(ctx, th.taskType) + if err != nil { + err = fmt.Errorf("开始任务回调函数返回错误,无法继续运行: %s", err) + span.SetStatus(codes.Error, err.Error()) + + if th.cfg.logIsDebug { + slog.DebugContext(ctx, fmt.Sprintf("开始任务回调函数返回错误,无法继续运行: %s", err)) + } + + // 过滤 + if th.cfg.traceIsFilter { + span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue)) + } + return + } + defer endCallback(ctx, th.taskType) + + } + for _, vTask := range th.taskList { // 运行单个任务 th.RunSingleTask(ctx, vTask, executionCallback, updateCallback) @@ -301,6 +326,7 @@ type TaskHelperRunSingleTaskResponse struct { } // RunSingleTask 运行单个任务 +// rootCtx 链路追踪的上下文 // task 任务 // executionCallback 执行任务回调函数 返回 runCode=状态 runDesc=描述 // updateCallback 执行更新回调函数 diff --git a/version.go b/version.go index 9bbe56e..113a255 100644 --- a/version.go +++ b/version.go @@ -1,5 +1,5 @@ package gojobs const ( - Version = "1.0.149" + Version = "1.0.150" )