forked from danmux/floe
-
Notifications
You must be signed in to change notification settings - Fork 14
/
hub_pend.go
173 lines (152 loc) · 5.24 KB
/
hub_pend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package hub
import (
"fmt"
"strings"
"time"
"github.com/floeit/floe/client"
"github.com/floeit/floe/config"
nt "github.com/floeit/floe/config/nodetype"
"github.com/floeit/floe/event"
"github.com/floeit/floe/log"
)
// This file contains all functions that deal with events that may or may not
// be serviced on this host i.e. pending runs - it uses the hub client to ask
// other nodes in the cluster if they can take a pending run.
// serviceLists attempts to dispatch pending flows
// TODO and times outs any active flows that are past their deadline
func (h *Hub) serviceLists() {
for range time.Tick(time.Second * 5) {
err := h.distributeAllPending()
if err != nil {
log.Error(err)
}
}
}
// distributeAllPending loops through all pending runs assessing whether they can be run then distributes them.
func (h *Hub) distributeAllPending() error {
for _, p := range h.runs.allPends() {
log.Debugf("<%s> - pending - attempt dispatch", p)
if len(h.hosts) == 0 {
log.Debugf("<%s> - pending - no hosts configured running job locally", p)
ok, err := h.ExecutePending(p)
if err != nil {
return err
}
if !ok {
log.Debugf("<%s> - pending - could not run job locally yet", p)
} else {
log.Debugf("<%s> - pending - job started locally", p)
if err := h.removePend(p); err != nil {
log.Error("could not save pending removal", err)
}
}
continue
}
// Find candidate hosts that have a superset of the tags for the pending flow
candidates := []*client.FloeHost{}
for _, host := range h.hosts {
cfg := host.GetConfig()
if cfg.HostID == "" {
continue // we have not communicated with the other host yet
}
log.Debugf("<%s> - pending - testing host %s with host tags: %v", p, cfg.HostID, cfg.Tags)
if cfg.TagsMatch(p.Flow.HostTags) {
log.Debugf("<%s> - pending - found matching host %s with host tags: %v", p, cfg.HostID, cfg.Tags)
candidates = append(candidates, host)
}
}
log.Debugf("<%s> - pending - found %d candidate hosts", p, len(candidates))
// attempt to send it to any of the candidates
launched := false
for _, host := range candidates {
if host.AttemptExecute(p) {
log.Debugf("<%s> - pending - executed on <%s>", p, host.GetConfig().HostID)
// remove from our pending list
if err := h.removePend(p); err != nil {
log.Error("could not save pending removal", err)
}
launched = true
break
}
}
if !launched {
log.Debugf("<%s> - pending - no available host yet", p)
}
// TODO check pending queue for any pending run that is over age and send alert
}
return nil
}
// pendFlowFromTrigger uses the subscription fired event e to put any flows on the pending queue
// for any matching triggers.
func (h *Hub) pendFlowFromTrigger(e event.Event) error {
if !strings.HasPrefix(e.Tag, inboundPrefix) {
return fmt.Errorf("event %s dispatched to triggers does not have inbound tag prefix", e.Tag)
}
triggerType := e.Tag[len(inboundPrefix)+1:]
log.Debugf("attempt to trigger type:<%s> (specified flow: %v)", triggerType, e.RunRef.FlowRef)
// find any Flows with subs matching this event
foundFlows := h.config.FindFlowsByTriggers(triggerType, e.RunRef.FlowRef, e.Opts)
if len(foundFlows) == 0 {
log.Debugf("no matching flow for type:'%s' (specified flow: %v)", triggerType, e.RunRef.FlowRef)
return nil
}
// add each flow to the pending list
for _, ff := range foundFlows {
// make sure the flow has loaded in any references
if ff.FlowFile != "" {
log.Debugf("<%s> - getting flow from file '%s'", ff.Ref, ff.FlowFile)
err := ff.Load(h.cachePath)
if err != nil {
log.Errorf("<%s> - could not load in the flow from FlowFile: '%s'", ff.Ref, ff.FlowFile)
continue
}
}
// get the matched trigger node opts, but override with any mathing from the event
opts := nt.MergeOpts(ff.Matched.Opts, e.Opts)
// add the flow to the pending list making note of the node and opts that triggered it
ref, err := h.addToPending(ff.Flow, h.hostID, ff.Matched.Ref, opts)
if err != nil {
return err
}
log.Debugf("<%s> - from trigger type '%s' added to pending", ref, triggerType)
}
return nil
}
// addToPending adds a flow to the list of pending runs and publishes appropriate system state change event.
func (h *Hub) addToPending(flow *config.Flow, hostID string, trig config.NodeRef, opts nt.Opts) (event.RunRef, error) {
ref, err := h.runs.addToPending(flow, hostID, trig, opts)
if err != nil {
return ref, err
}
h.queue.Publish(event.Event{
RunRef: ref,
Tag: tagStateChange,
Opts: nt.Opts{
"action": "add-pend",
},
Good: true,
})
return ref, nil
}
// removePend removes the pend from the pending list issuing system state change event.
// Any error returned will be in the persisting of the pending list.
func (h *Hub) removePend(pend Pend) error {
ok, err := h.runs.removePend(pend)
if err != nil {
return err
}
// If this did remove it from the pending list then send the system event.
// The activate event can be used to remove from front end lists, instead of this event,
// however this event can be fired even if an activate has not been.
if ok {
h.queue.Publish(event.Event{
RunRef: pend.Ref,
Tag: tagStateChange,
Opts: nt.Opts{
"action": "remove-pend",
},
Good: true,
})
}
return nil
}