forked from coreos/fleet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
event.go
105 lines (90 loc) · 2.88 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package registry
import (
"errors"
"strings"
"time"
etcdErr "github.com/coreos/fleet/third_party/github.com/coreos/etcd/error"
"github.com/coreos/fleet/third_party/github.com/coreos/go-etcd/etcd"
log "github.com/coreos/fleet/third_party/github.com/golang/glog"
"github.com/coreos/fleet/event"
)
type EventStream struct {
etcd *etcd.Client
registry *EtcdRegistry
}
func NewEventStream(client *etcd.Client, registry Registry) (*EventStream, error) {
reg, ok := registry.(*EtcdRegistry)
if !ok {
return nil, errors.New("EventStream currently only works with EtcdRegistry")
}
return &EventStream{client, reg}, nil
}
func (es *EventStream) Stream(idx uint64, eventchan chan *event.Event, stop chan bool) {
filters := []func(*etcd.Response) *event.Event{
filterEventJobDestroyed,
filterEventJobScheduled,
filterEventJobUnscheduled,
es.filterJobTargetStateChanges,
filterEventMachineCreated,
filterEventMachineRemoved,
es.filterEventJobOffered,
filterEventJobBidSubmitted,
}
etcdchan := make(chan *etcd.Response)
go watch(es.etcd, idx, etcdchan, es.registry.keyPrefix, stop)
go pipe(etcdchan, filters, eventchan, stop)
}
func pipe(etcdchan chan *etcd.Response, filters []func(resp *etcd.Response) *event.Event, eventchan chan *event.Event, stop chan bool) {
for true {
select {
case <-stop:
return
case resp := <-etcdchan:
log.V(1).Infof("Received response from etcd watcher: Action=%s ModifiedIndex=%d Key=%s", resp.Action, resp.Node.ModifiedIndex, resp.Node.Key)
for _, f := range filters {
ev := f(resp)
if ev == nil {
continue
}
log.V(1).Infof("Translated response(ModifiedIndex=%d) to event(Type=%s)", resp.Node.ModifiedIndex, ev.Type)
eventchan <- ev
}
}
}
}
func watch(client *etcd.Client, idx uint64, etcdchan chan *etcd.Response, key string, stop chan bool) {
for true {
select {
case <-stop:
log.V(1).Infof("Gracefully closing etcd watch loop: key=%s", key)
return
default:
log.V(1).Infof("Creating etcd watcher: key=%s, index=%d, machines=%s", key, idx, strings.Join(client.GetCluster(), ","))
resp, err := client.Watch(key, idx, true, nil, stop)
if err == nil {
idx = resp.Node.ModifiedIndex + 1
etcdchan <- resp
continue
}
log.Errorf("etcd watcher returned error: key=%s, err=\"%s\"", key, err.Error())
etcdError, ok := err.(*etcd.EtcdError)
if !ok {
// Let's not slam the etcd server in the event that we know
// an unexpected error occurred.
time.Sleep(time.Second)
continue
}
switch etcdError.ErrorCode {
case etcdErr.EcodeEventIndexCleared:
// This is racy, but adding one to the last known index
// will help get this watcher back into the range of
// etcd's internal event history
idx = idx + 1
default:
// Let's not slam the etcd server in the event that we know
// an unexpected error occurred.
time.Sleep(time.Second)
}
}
}
}