forked from st3v/go-plugins
/
watcher.go
142 lines (124 loc) · 2.55 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package eureka
import (
"errors"
"time"
"github.com/hudl/fargo"
"github.com/micro/go-micro/registry"
)
type eurekaWatcher struct {
conn fargoConnection
exit chan bool
results chan *registry.Result
}
func newWatcher(conn fargoConnection, opts ...registry.WatchOption) registry.Watcher {
var wo registry.WatchOptions
for _, o := range opts {
o(&wo)
}
w := &eurekaWatcher{
conn: conn,
exit: make(chan bool),
results: make(chan *registry.Result),
}
// watch a single service
if len(wo.Service) > 0 {
done := make(chan struct{})
ch := conn.ScheduleAppUpdates(wo.Service, false, done)
go w.watch(ch, done)
go func() {
<-w.exit
close(done)
}()
return w
}
// watch all services
go w.poll()
return w
}
func (e *eurekaWatcher) poll() {
// list service ticker
t := time.NewTicker(time.Second * 10)
done := make(chan struct{})
services := make(map[string]<-chan fargo.AppUpdate)
for {
select {
case <-e.exit:
close(done)
return
case <-t.C:
apps, err := e.conn.GetApps()
if err != nil {
continue
}
for _, app := range apps {
if _, ok := services[app.Name]; ok {
continue
}
ch := e.conn.ScheduleAppUpdates(app.Name, false, done)
services[app.Name] = ch
go e.watch(ch, done)
}
}
}
}
func (e *eurekaWatcher) watch(ch <-chan fargo.AppUpdate, done chan struct{}) {
for {
select {
// exit on exit
case <-e.exit:
return
// exit on done
case <-done:
return
// process updates
case u := <-ch:
if u.Err != nil {
continue
}
// process instances independently
for _, instance := range u.App.Instances {
var action string
switch instance.Status {
// update
case fargo.UP:
action = "update"
// delete
case fargo.OUTOFSERVICE, fargo.UNKNOWN, fargo.DOWN:
action = "delete"
// skip
default:
continue
}
// construct the service with a single node
service := appToService(&fargo.Application{
Name: u.App.Name,
Instances: []*fargo.Instance{instance},
})
if len(service) == 0 {
continue
}
// in case we get bounced during processing
// check exit channels
select {
// send the update
case e.results <- ®istry.Result{Action: action, Service: service[0]}:
case <-done:
return
case <-e.exit:
return
}
}
}
}
}
func (e *eurekaWatcher) Next() (*registry.Result, error) {
select {
case <-e.exit:
return nil, errors.New("watcher stopped")
case r := <-e.results:
return r, nil
}
}
func (e *eurekaWatcher) Stop() {
close(e.exit)
}