-
Notifications
You must be signed in to change notification settings - Fork 683
/
consulwatchman.go
116 lines (99 loc) · 2.87 KB
/
consulwatchman.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package watt
import (
"fmt"
consulapi "github.com/hashicorp/consul/api"
"github.com/datawire/ambassador/pkg/consulwatch"
"github.com/datawire/ambassador/pkg/dlog"
"github.com/datawire/ambassador/pkg/supervisor"
)
type consulEvent struct {
WatchId string
Endpoints consulwatch.Endpoints
}
type consulwatchman struct {
WatchMaker IConsulWatchMaker
watchesCh <-chan []ConsulWatchSpec
watched map[string]*supervisor.Worker
}
type ConsulWatchMaker struct {
aggregatorCh chan<- consulEvent
}
func (m *ConsulWatchMaker) MakeConsulWatch(spec ConsulWatchSpec) (*supervisor.Worker, error) {
consulConfig := consulapi.DefaultConfig()
consulConfig.Address = spec.ConsulAddress
// TODO: Should we really allocated a Consul client per Service watch? Not sure... there some design stuff here
// May be multiple consul clusters
// May be different connection parameters on the consulConfig
// Seems excessive...
consul, err := consulapi.NewClient(consulConfig)
if err != nil {
return nil, err
}
worker := &supervisor.Worker{
Name: fmt.Sprintf("consul:%s", spec.WatchId()),
Work: func(p *supervisor.Process) error {
logger := dlog.GetLogger(p.Context()).
StdLogger(dlog.LogLevelInfo)
w, err := consulwatch.New(consul, logger, spec.Datacenter, spec.ServiceName, true)
if err != nil {
p.Logf("failed to setup new consul watch %v", err)
return err
}
w.Watch(func(endpoints consulwatch.Endpoints, e error) {
endpoints.Id = spec.Id
m.aggregatorCh <- consulEvent{spec.WatchId(), endpoints}
})
_ = p.Go(func(p *supervisor.Process) error {
x := w.Start()
if x != nil {
p.Logf("failed to start service watcher %v", x)
return x
}
return nil
})
<-p.Shutdown()
w.Stop()
return nil
},
Retry: true,
}
return worker, nil
}
func (w *consulwatchman) Work(p *supervisor.Process) error {
p.Ready()
for {
select {
case watches := <-w.watchesCh:
found := make(map[string]*supervisor.Worker)
p.Debugf("processing %d consul watches", len(watches))
for _, cw := range watches {
worker, err := w.WatchMaker.MakeConsulWatch(cw)
if err != nil {
p.Logf("failed to create consul watch %v", err)
continue
}
if _, exists := w.watched[worker.Name]; exists {
found[worker.Name] = w.watched[worker.Name]
} else {
p.Debugf("add consul watcher %s\n", worker.Name)
p.Supervisor().Supervise(worker)
w.watched[worker.Name] = worker
found[worker.Name] = worker
}
}
// purge the watches that no longer are needed because they did not come through the in the latest
// report
for workerName, worker := range w.watched {
if _, exists := found[workerName]; !exists {
p.Debugf("remove consul watcher %s\n", workerName)
worker.Shutdown()
worker.Wait()
}
}
w.watched = found
case <-p.Shutdown():
p.Debugf("shutdown initiated")
return nil
}
}
}