forked from RichardKnop/machinery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
state.go
107 lines (94 loc) · 2.89 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package tasks
import "time"
const (
// StatePending - initial state of a task
StatePending = "PENDING"
// StateReceived - when task is received by a worker
StateReceived = "RECEIVED"
// StateStarted - when the worker starts processing the task
StateStarted = "STARTED"
// StateRetry - when failed task has been scheduled for retry
StateRetry = "RETRY"
// StateSuccess - when the task is processed successfully
StateSuccess = "SUCCESS"
// StateFailure - when processing of the task fails
StateFailure = "FAILURE"
)
// TaskState represents a state of a task
type TaskState struct {
TaskUUID string `bson:"_id"`
TaskName string `bson:"task_name"`
State string `bson:"state"`
Results []*TaskResult `bson:"results"`
Error string `bson:"error"`
CreatedAt time.Time `bson:"created_at"`
}
// GroupMeta stores useful metadata about tasks within the same group
// E.g. UUIDs of all tasks which are used in order to check if all tasks
// completed successfully or not and thus whether to trigger chord callback
type GroupMeta struct {
GroupUUID string `bson:"_id"`
TaskUUIDs []string `bson:"task_uuids"`
ChordTriggered bool `bson:"chord_triggered"`
Lock bool `bson:"lock"`
CreatedAt time.Time `bson:"created_at"`
}
// NewPendingTaskState ...
func NewPendingTaskState(signature *Signature) *TaskState {
return &TaskState{
TaskUUID: signature.UUID,
TaskName: signature.Name,
State: StatePending,
CreatedAt: time.Now().UTC(),
}
}
// NewReceivedTaskState ...
func NewReceivedTaskState(signature *Signature) *TaskState {
return &TaskState{
TaskUUID: signature.UUID,
State: StateReceived,
}
}
// NewStartedTaskState ...
func NewStartedTaskState(signature *Signature) *TaskState {
return &TaskState{
TaskUUID: signature.UUID,
State: StateStarted,
}
}
// NewSuccessTaskState ...
func NewSuccessTaskState(signature *Signature, results []*TaskResult) *TaskState {
return &TaskState{
TaskUUID: signature.UUID,
State: StateSuccess,
Results: results,
}
}
// NewFailureTaskState ...
func NewFailureTaskState(signature *Signature, err string) *TaskState {
return &TaskState{
TaskUUID: signature.UUID,
State: StateFailure,
Error: err,
}
}
// NewRetryTaskState ...
func NewRetryTaskState(signature *Signature) *TaskState {
return &TaskState{
TaskUUID: signature.UUID,
State: StateRetry,
}
}
// IsCompleted returns true if state is SUCCESS or FAILURE,
// i.e. the task has finished processing and either succeeded or failed.
func (taskState *TaskState) IsCompleted() bool {
return taskState.IsSuccess() || taskState.IsFailure()
}
// IsSuccess returns true if state is SUCCESS
func (taskState *TaskState) IsSuccess() bool {
return taskState.State == StateSuccess
}
// IsFailure returns true if state is FAILURE
func (taskState *TaskState) IsFailure() bool {
return taskState.State == StateFailure
}