forked from coreos/fleet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
event.go
117 lines (95 loc) · 2.57 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"path"
"strings"
"time"
"github.com/coreos/fleet/etcd"
"github.com/coreos/fleet/log"
"github.com/coreos/fleet/pkg"
)
const (
// Occurs when any Job's target is touched
JobTargetChangeEvent = pkg.Event("JobTargetChangeEvent")
// Occurs when any Job's target state is touched
JobTargetStateChangeEvent = pkg.Event("JobTargetStateChangeEvent")
)
type etcdEventStream struct {
etcd etcd.Client
rootPrefix string
}
func NewEtcdEventStream(client etcd.Client, rootPrefix string) pkg.EventStream {
return &etcdEventStream{client, rootPrefix}
}
// Next returns a channel which will emit an Event as soon as one of interest occurs
func (es *etcdEventStream) Next(stop chan struct{}) chan pkg.Event {
evchan := make(chan pkg.Event)
go func() {
for {
select {
case <-stop:
return
default:
}
res := watch(es.etcd, path.Join(es.rootPrefix, jobPrefix), stop)
if ev, ok := parse(res, es.rootPrefix); ok {
evchan <- ev
return
}
}
}()
return evchan
}
func parse(res *etcd.Result, prefix string) (ev pkg.Event, ok bool) {
if res == nil || res.Node == nil {
return
}
if !strings.HasPrefix(res.Node.Key, path.Join(prefix, jobPrefix)) {
return
}
switch path.Base(res.Node.Key) {
case "target-state":
ev = JobTargetStateChangeEvent
ok = true
case "target":
ev = JobTargetChangeEvent
ok = true
}
return
}
func watch(client etcd.Client, key string, stop chan struct{}) (res *etcd.Result) {
for res == nil {
select {
case <-stop:
log.V(1).Infof("Gracefully closing etcd watch loop: key=%s", key)
return
default:
req := &etcd.Watch{
Key: key,
WaitIndex: 0,
Recursive: true,
}
log.V(1).Infof("Creating etcd watcher: %v", req)
var err error
res, err = client.Wait(req, stop)
if err != nil {
log.Errorf("etcd watcher %v returned error: %v", req, err)
}
}
// Let's not slam the etcd server in the event that we know
// an unexpected error occurred.
time.Sleep(time.Second)
}
return
}