forked from RichardKnop/machinery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
workflow.go
117 lines (91 loc) · 2.78 KB
/
workflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package tasks
import (
"fmt"
"github.com/satori/go.uuid"
)
// Chain creates a chain of tasks to be executed one after another
type Chain struct {
Tasks []*Signature
}
// Group creates a set of tasks to be executed in parallel
type Group struct {
GroupUUID string
Tasks []*Signature
}
// Chord adds an optional callback to the group to be executed
// after all tasks in the group finished
type Chord struct {
Group *Group
Callback *Signature
}
// GetUUIDs returns slice of task UUIDS
func (group *Group) GetUUIDs() []string {
taskUUIDs := make([]string, len(group.Tasks))
for i, signature := range group.Tasks {
taskUUIDs[i] = signature.UUID
}
return taskUUIDs
}
// NewChain creates a new chain of tasks to be processed one by one, passing
// results unless task signatures are set to be immutable
func NewChain(signatures ...*Signature) (*Chain, error) {
// Auto generate task UUIDs if needed
for _, signature := range signatures {
if signature.UUID == "" {
signatureID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("Error generating signature id: %s", err.Error())
}
signature.UUID = fmt.Sprintf("task_%v", signatureID)
}
}
for i := len(signatures) - 1; i > 0; i-- {
if i > 0 {
signatures[i-1].OnSuccess = []*Signature{signatures[i]}
}
}
chain := &Chain{Tasks: signatures}
return chain, nil
}
// NewGroup creates a new group of tasks to be processed in parallel
func NewGroup(signatures ...*Signature) (*Group, error) {
// Generate a group UUID
groupUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("Error generating group uuid: %s", err.Error())
}
groupID := fmt.Sprintf("group_%v", groupUUID)
// Auto generate task UUIDs if needed, group tasks by common group UUID
for _, signature := range signatures {
if signature.UUID == "" {
signatureID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("Error generating signature id: %s", err.Error())
}
signature.UUID = fmt.Sprintf("task_%v", signatureID)
}
signature.GroupUUID = groupID
signature.GroupTaskCount = len(signatures)
}
return &Group{
GroupUUID: groupID,
Tasks: signatures,
}, nil
}
// NewChord creates a new chord (a group of tasks with a single callback
// to be executed after all tasks in the group has completed)
func NewChord(group *Group, callback *Signature) (*Chord, error) {
if callback.UUID == "" {
// Generate a UUID for the chord callback
callbackUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("Error generating callback id: %s", err.Error())
}
callback.UUID = fmt.Sprintf("chord_%v", callbackUUID)
}
// Add a chord callback to all tasks
for _, signature := range group.Tasks {
signature.ChordCallback = callback
}
return &Chord{Group: group, Callback: callback}, nil
}