Skip to content

Commit

Permalink
- update helper
Browse files Browse the repository at this point in the history
  • Loading branch information
dtapps committed Jun 17, 2024
1 parent f5af1b6 commit c0f6186
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 24 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
#### 安装

```shell
go get -v -u go.dtapp.net/gojobs@v1.0.149
go get -v -u go.dtapp.net/gojobs@v1.0.150
```
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/redis/go-redis/v9 v9.5.3
github.com/robfig/cron/v3 v3.0.1
go.dtapp.net/gojson v1.0.4
go.dtapp.net/gorequest v1.0.68
go.dtapp.net/gorequest v1.0.70
go.dtapp.net/gostring v1.0.15
go.dtapp.net/gotime v1.0.11
go.opentelemetry.io/otel v1.27.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ go.dtapp.net/gojson v1.0.4 h1:9en9iyOOLWoEIo2eKhqt3/Djh/3HhwsTpXxgI9efPRo=
go.dtapp.net/gojson v1.0.4/go.mod h1:G9CMVzNSRkbNzIic/vJqHCOyKtw6BW2YM8Vyn64zfM0=
go.dtapp.net/gorandom v1.0.3 h1:6RNDFMJfLlHFR98c2tbecaMzg4vEIHyEpduMBUdd0jc=
go.dtapp.net/gorandom v1.0.3/go.mod h1:Qd6ywCSrk7sCkh9OdR0wygcbQBFt53WDIrx8rqGBvis=
go.dtapp.net/gorequest v1.0.68 h1:KFH5mKJSs3gthbJj+IxDXn54qabPY4pBE6rqNjfvdyg=
go.dtapp.net/gorequest v1.0.68/go.mod h1:buYy18+d4jO3DbTl88BXyagilw5unU5FFDq5VfCehEg=
go.dtapp.net/gorequest v1.0.70 h1:1yxWhhBpxaSy1cCzyuAFYvoj5FJHam52dnkKs4aUIHg=
go.dtapp.net/gorequest v1.0.70/go.mod h1:buYy18+d4jO3DbTl88BXyagilw5unU5FFDq5VfCehEg=
go.dtapp.net/gostring v1.0.15 h1:MxvbgsBHSstIUdtwHf1FGtfh1bPbAlxxC+9NFVk396M=
go.dtapp.net/gostring v1.0.15/go.mod h1:AJLixiPhpBZrSKQ7yW46a+42iEIrOlfkJJ+Hta/j8RE=
go.dtapp.net/gotime v1.0.11 h1:VAmi1kFhkwJweIujeUOWf16wSmAZd4VyhyD25ku/J2Y=
Expand Down
44 changes: 36 additions & 8 deletions task.custom.helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type TaskCustomHelper struct {
}

// NewTaskCustomHelper 任务帮助
// ctx 链路追踪的上下文
// rootCtx 链路追踪的上下文
// taskType 任务类型
// logIsDebug 日志是否启动
// traceIsFilter 链路追踪是否过滤
Expand Down Expand Up @@ -58,6 +58,7 @@ func NewTaskCustomHelper(rootCtx context.Context, taskType string, opts ...TaskH
}

// QueryTaskList 通过回调函数获取任务列表
// rootCtx 链路追踪的上下文
// isRunCallback 任务列表回调函数 返回 是否使用 任务列表
// listCallback 任务回调函数 返回 任务列表
// newTaskLists 新的任务列表
Expand All @@ -79,11 +80,11 @@ func (th *TaskCustomHelper) QueryTaskList(rootCtx context.Context, isRunCallback
if isRunUse {
if isRunResult.Err() != nil {
if errors.Is(isRunResult.Err(), redis.Nil) {
err := fmt.Errorf("查询redis的key不存在,根据设置,无法继续运行: %v", isRunResult.Err().Error())
err := fmt.Errorf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error())
span.SetStatus(codes.Error, err.Error())

if th.cfg.logIsDebug {
slog.DebugContext(ctx, "查询redis的key不存在,根据设置,无法继续运行", slog.String("key", GetRedisKeyName(th.taskType)), slog.String("err", isRunResult.Err().Error()))
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error()))
}

// 过滤
Expand All @@ -93,20 +94,20 @@ func (th *TaskCustomHelper) QueryTaskList(rootCtx context.Context, isRunCallback
return
}

err := fmt.Errorf("查询redis的key异常,无法继续运行: %v", isRunResult.Err().Error())
err := fmt.Errorf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error())
span.SetStatus(codes.Error, err.Error())

if th.cfg.logIsDebug {
slog.DebugContext(ctx, "QueryTaskList 查询redis的key异常,无法继续运行", slog.String("err", isRunResult.Err().Error()))
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error()))
}
return
}
if isRunResult.Val() == "" {
err := fmt.Errorf("查询redis的key内容为空,根据配置,无法继续运行: %s", isRunResult.Val())
err := fmt.Errorf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val())
span.SetStatus(codes.Error, err.Error())

if th.cfg.logIsDebug {
slog.DebugContext(ctx, "QueryTaskList 查询redis的key内容为空,根据配置,无法继续运行", slog.String("val", isRunResult.Val()))
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val()))
}

// 过滤
Expand Down Expand Up @@ -150,8 +151,12 @@ func (th *TaskCustomHelper) GetTaskList() []TaskCustomHelperTaskList {
}

// RunMultipleTask 运行多个任务
// rootCtx 链路追踪的上下文
// wait 等待时间(秒)
// executionCallback 执行任务回调函数
func (th *TaskCustomHelper) RunMultipleTask(rootCtx context.Context, wait int64, executionCallback func(ctx context.Context, task TaskCustomHelperTaskList) (err error)) {
// startCallback 开始任务回调函数
// endCallback 结束任务回调函数
func (th *TaskCustomHelper) RunMultipleTask(rootCtx context.Context, wait int64, executionCallback func(ctx context.Context, task TaskCustomHelperTaskList) (err error), startCallback func(ctx context.Context, taskType string) (err error), endCallback func(ctx context.Context, taskType string)) {

// 启动OpenTelemetry链路追踪
ctx, span := NewTraceStartSpan(rootCtx, "RunMultipleTask")
Expand All @@ -168,6 +173,28 @@ func (th *TaskCustomHelper) RunMultipleTask(rootCtx context.Context, wait int64,
slog.DebugContext(ctx, "RunMultipleTask 运行多个任务", slog.Int64("wait", wait))
}

if startCallback != nil && endCallback != nil {

// 开始任务回调函数
err := startCallback(ctx, th.taskType)
if err != nil {
err = fmt.Errorf("开始任务回调函数返回错误,无法继续运行: %s", err)
span.SetStatus(codes.Error, err.Error())

if th.cfg.logIsDebug {
slog.DebugContext(ctx, fmt.Sprintf("开始任务回调函数返回错误,无法继续运行: %s", err))
}

// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}
return
}
defer endCallback(ctx, th.taskType)

}

for _, vTask := range th.taskList {

// 运行单个任务
Expand All @@ -184,6 +211,7 @@ func (th *TaskCustomHelper) RunMultipleTask(rootCtx context.Context, wait int64,
}

// RunSingleTask 运行单个任务
// rootCtx 链路追踪的上下文
// task 任务
// executionCallback 执行任务回调函数
func (th *TaskCustomHelper) RunSingleTask(rootCtx context.Context, task TaskCustomHelperTaskList, executionCallback func(ctx context.Context, task TaskCustomHelperTaskList) (err error)) {
Expand Down
48 changes: 37 additions & 11 deletions task.helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewTaskHelper(rootCtx context.Context, taskType string, opts ...TaskHelperO
}

// QueryTaskList 通过回调函数获取任务列表
// rootCtx 链路追踪的上下文
// isRunCallback 任务列表回调函数 返回 是否使用 任务列表
// listCallback 任务回调函数 返回 任务列表
// newTaskLists 新的任务列表
Expand All @@ -77,12 +78,12 @@ func (th *TaskHelper) QueryTaskList(rootCtx context.Context, isRunCallback func(
if isRunUse {
if isRunResult.Err() != nil {
if errors.Is(isRunResult.Err(), redis.Nil) {
err := fmt.Errorf("查询redis的key不存在,根据设置,无法继续运行: %v", isRunResult.Err().Error())
err := fmt.Errorf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error())
//th.listSpan.RecordError( err, trace.WithStackTrace(true))
span.SetStatus(codes.Error, err.Error())

if th.cfg.logIsDebug {
slog.DebugContext(ctx, "查询redis的key不存在,根据设置,无法继续运行", slog.String("key", GetRedisKeyName(th.taskType)), slog.String("err", isRunResult.Err().Error()))
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回不存在,无法继续运行: %v@%v", GetRedisKeyName(th.taskType), isRunResult.Err().Error()))
}

// 过滤
Expand All @@ -92,30 +93,28 @@ func (th *TaskHelper) QueryTaskList(rootCtx context.Context, isRunCallback func(
return
}

err := fmt.Errorf("查询redis的key异常,无法继续运行: %v", isRunResult.Err().Error())
err := fmt.Errorf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error())
//th.listSpan.RecordError( err, trace.WithStackTrace(true))
span.SetStatus(codes.Error, err.Error())

if th.cfg.logIsDebug {
slog.DebugContext(ctx, "QueryTaskList 查询redis的key异常,无法继续运行", slog.String("err", isRunResult.Err().Error()))
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回错误,无法继续运行: %v", isRunResult.Err().Error()))
}

return
}
if isRunResult.Val() == "" {
err := fmt.Errorf("查询redis的key内容为空,根据配置,无法继续运行: %s", isRunResult.Val())
err := fmt.Errorf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val())
//th.listSpan.RecordError(err, trace.WithStackTrace(true))
span.SetStatus(codes.Error, err.Error())

if th.cfg.logIsDebug {
slog.DebugContext(ctx, "QueryTaskList 查询redis的key内容为空,根据配置,无法继续运行", slog.String("val", isRunResult.Val()))
slog.DebugContext(ctx, fmt.Sprintf("执行任务列表回调函数返回空,无法继续运行: %s", isRunResult.Val()))
}

// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}

return
}
}
Expand Down Expand Up @@ -155,6 +154,7 @@ func (th *TaskHelper) QueryTaskList(rootCtx context.Context, isRunCallback func(
}

// FilterTaskList 过滤任务列表
// rootCtx 链路追踪的上下文
// isMandatoryIp 强制当前ip
// specifyIp 指定Ip
// isContinue 是否继续
Expand Down Expand Up @@ -241,7 +241,6 @@ func (th *TaskHelper) FilterTaskList(rootCtx context.Context, isMandatoryIp bool
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}

return
}

Expand All @@ -253,14 +252,18 @@ func (th *TaskHelper) FilterTaskList(rootCtx context.Context, isMandatoryIp bool
}

// GetTaskList 请在FilterTaskList之后获取任务列表
func (th *TaskHelper) GetTaskList(rootCtx context.Context) []GormModelTask {
func (th *TaskHelper) GetTaskList() []GormModelTask {
return th.taskList
}

// RunMultipleTask 运行多个任务
// rootCtx 链路追踪的上下文
// wait 等待时间(秒)
// executionCallback 执行任务回调函数 返回 runCode=状态 runDesc=描述
// updateCallback 执行更新回调函数
func (th *TaskHelper) RunMultipleTask(rootCtx context.Context, wait int64, executionCallback func(ctx context.Context, task GormModelTask) (runCode int, runDesc string), updateCallback func(ctx context.Context, task GormModelTask, result TaskHelperRunSingleTaskResponse)) {
// startCallback 开始任务回调函数
// endCallback 结束任务回调函数
func (th *TaskHelper) RunMultipleTask(rootCtx context.Context, wait int64, executionCallback func(ctx context.Context, task GormModelTask) (runCode int, runDesc string), updateCallback func(ctx context.Context, task GormModelTask, result TaskHelperRunSingleTaskResponse), startCallback func(ctx context.Context, taskType string) (err error), endCallback func(ctx context.Context, taskType string)) {

// 启动OpenTelemetry链路追踪
ctx, span := NewTraceStartSpan(rootCtx, "RunMultipleTask")
Expand All @@ -277,6 +280,28 @@ func (th *TaskHelper) RunMultipleTask(rootCtx context.Context, wait int64, execu
slog.DebugContext(ctx, "RunMultipleTask 运行多个任务", slog.Int64("wait", wait))
}

if startCallback != nil && endCallback != nil {

// 开始任务回调函数
err := startCallback(ctx, th.taskType)
if err != nil {
err = fmt.Errorf("开始任务回调函数返回错误,无法继续运行: %s", err)
span.SetStatus(codes.Error, err.Error())

if th.cfg.logIsDebug {
slog.DebugContext(ctx, fmt.Sprintf("开始任务回调函数返回错误,无法继续运行: %s", err))
}

// 过滤
if th.cfg.traceIsFilter {
span.SetAttributes(attribute.String(th.cfg.traceIsFilterKeyName, th.cfg.traceIsFilterKeyValue))
}
return
}
defer endCallback(ctx, th.taskType)

}

for _, vTask := range th.taskList {
// 运行单个任务
th.RunSingleTask(ctx, vTask, executionCallback, updateCallback)
Expand All @@ -301,6 +326,7 @@ type TaskHelperRunSingleTaskResponse struct {
}

// RunSingleTask 运行单个任务
// rootCtx 链路追踪的上下文
// task 任务
// executionCallback 执行任务回调函数 返回 runCode=状态 runDesc=描述
// updateCallback 执行更新回调函数
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package gojobs

const (
Version = "1.0.149"
Version = "1.0.150"
)

0 comments on commit c0f6186

Please sign in to comment.