-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor_subtask_timeout.go
135 lines (108 loc) · 3.01 KB
/
monitor_subtask_timeout.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package scheduler
import (
"context"
"strconv"
"time"
"github.com/go-redis/redis/v8"
"github.com/golang/glog"
"github.com/danenmao/pterergate-dtf/dtf/taskmodel"
"github.com/danenmao/pterergate-dtf/internal/config"
"github.com/danenmao/pterergate-dtf/internal/redistool"
"github.com/danenmao/pterergate-dtf/internal/subtasktool"
)
func MonitorTimeoutSubtask() {
// 取超时的子任务
var subtaskList = []uint64{}
err := getTimeoutSubtasks(&subtaskList)
if err != nil {
glog.Warning("failed to get timeout subtasks: ", err)
return
}
if len(subtaskList) <= 0 {
return
}
// 处理超时的子任务
err = repairTimeoutSubtasks(&subtaskList)
if err != nil {
glog.Warning("failed to repair timeout subtask: ", err)
return
}
}
func getTimeoutSubtasks(subtaskList *[]uint64) error {
if subtaskList == nil {
panic("invalid subtaskList pointer")
}
// 从redis_subtask_scanning_zset 中取超时的子任务
now := time.Now().Unix()
nowStr := strconv.FormatUint(uint64(now), 10)
opt := redis.ZRangeBy{
Min: "-inf", Max: nowStr,
Offset: 0, Count: 100,
}
cmd := redistool.DefaultRedis().ZRangeByScore(
context.Background(), config.RunningSubtaskZset, &opt,
)
err := cmd.Err()
if err != nil {
glog.Warning("failed to get timeout subtask from redis: ", err)
return err
}
strList := cmd.Val()
if len(strList) > 0 {
glog.Info("got timeout subtask: ", strList)
}
// 转换查询到的子任务ID
for _, str := range strList {
id, err := strconv.ParseUint(str, 10, 64)
if err != nil {
glog.Warning("failed to convert timeout subtask id: ", str)
continue
}
*subtaskList = append(*subtaskList, id)
}
// no timeout subtask
if len(*subtaskList) == 0 {
glog.Info("get empty list, no timeout subtask")
return nil
}
glog.Info("got timeout subtasks: ", *subtaskList)
return nil
}
// 修复子任务的超时状态
func repairTimeoutSubtasks(subtaskList *[]uint64) error {
if subtaskList == nil {
panic("invalid subtaskList pointer")
}
// remove this subtask from running subtasks list
owndSubtaskList := []uint64{}
err := redistool.TryToOwnElements(config.RunningSubtaskZset, subtaskList, &owndSubtaskList)
if err != nil {
return err
}
if len(owndSubtaskList) <= 0 {
return nil
}
completeTime := time.Now().Unix()
pipeline := redistool.DefaultRedis().Pipeline()
for _, id := range owndSubtaskList {
glog.Info("owned subtask, set subtask to complete: ", id)
// set completion code to timeout
err = subtasktool.SetSubtaskResult(id, taskmodel.SubtaskResult_Timeout, "", &pipeline)
if err != nil {
glog.Warning("failed to set subtask timeout: ", id, err)
}
// 插入到 redis_subtask_complete_list 完成队列中
z := redis.Z{
Member: id,
Score: float64(completeTime),
}
pipeline.ZAdd(context.Background(), config.CompletedSubtaskList, &z)
}
_, err = pipeline.Exec(context.Background())
if err != nil {
glog.Warning("failed to exec pipeline: ", err)
return err
}
glog.Info("succeeded to repair timeout subtasks: ", *subtaskList)
return nil
}