-
Notifications
You must be signed in to change notification settings - Fork 24
/
task.go
98 lines (82 loc) · 1.98 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package job
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/douyu/jupiter/pkg/xlog"
)
type (
Task struct {
TaskID uint64
job *Job
executedAt time.Time
finishedAt *time.Time
}
TaskOption func(t *Task)
CronTaskStatus string
TaskResult struct {
TaskID uint64 `json:"task_id"`
Status CronTaskStatus `json:"status"`
Job *Job `json:"job"`
Logs string `json:"logs"`
RunOn string `json:"run_on"`
ExecutedAt time.Time `json:"executed_at"`
FinishedAt *time.Time `json:"finished_at"`
}
)
var (
CronTaskStatusProcessing CronTaskStatus = "processing"
CronTaskStatusSuccess CronTaskStatus = "success"
CronTaskStatusFailed CronTaskStatus = "failed"
CronTaskStatusTimeout CronTaskStatus = "timeout"
)
func NewTask(job *Job, ops ...TaskOption) *Task {
task := &Task{
job: job,
executedAt: time.Now(),
}
for _, op := range ops {
op(task)
}
if task.TaskID == 0 {
id, _ := job.Worker.taskIdGen.NextID()
task.TaskID = id
}
return task
}
func (t *Task) SetStatus(status CronTaskStatus, logs string) error {
if status == CronTaskStatusSuccess || status == CronTaskStatusFailed || status == CronTaskStatusTimeout {
now := time.Now()
t.finishedAt = &now
}
payload := TaskResult{
TaskID: t.TaskID,
Job: t.job,
Status: status,
Logs: logs,
RunOn: t.job.HostName,
ExecutedAt: t.executedAt,
FinishedAt: t.finishedAt,
}
payloadBytes, _ := json.Marshal(&payload)
_, err := t.job.Client.Put(context.Background(),
t.Key(),
string(payloadBytes),
)
return err
}
func (t *Task) Key() string {
return fmt.Sprintf("%s%s/%d", ResultKeyPrefix, t.job.ID, t.TaskID)
}
func (t *Task) Stop() {
_, err := t.job.Client.Delete(context.Background(), t.Key())
if err != nil {
t.job.logger.Error("delete task result failed", xlog.FieldErr(err))
}
}
func WithTaskID(taskId uint64) TaskOption {
return func(t *Task) {
t.TaskID = taskId
}
}