/
consul.go
132 lines (116 loc) · 4.18 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package discovery
import (
"fmt"
"os"
"sort"
"sync"
"github.com/hashicorp/consul/api"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)
var collector *prometheus.GaugeVec
func init() {
collector = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "containerpilot_watch_instances",
Help: "gauge of instances found for each ContainerPilot watch, partitioned by service",
}, []string{"service"})
prometheus.MustRegister(collector)
}
// Consul wraps the service discovery backend for the Hashicorp Consul client
// and tracks the state of all watched dependencies.
type Consul struct {
api.Client
lock sync.RWMutex
watchedServices map[string][]*api.ServiceEntry
}
// NewConsul creates a new service discovery backend for Consul
func NewConsul(config interface{}) (*Consul, error) {
var consulConfig *api.Config
var err error
switch t := config.(type) {
case string:
consulConfig, err = configFromURI(t)
case map[string]interface{}:
consulConfig, err = configFromMap(t)
default:
return nil, fmt.Errorf("no discovery backend defined")
}
if err != nil {
return nil, err
}
if token := os.Getenv("CONSUL_HTTP_TOKEN"); token != "" {
consulConfig.Token = token
}
client, err := api.NewClient(consulConfig)
if err != nil {
return nil, err
}
watchedServices := make(map[string][]*api.ServiceEntry)
consul := &Consul{*client, sync.RWMutex{}, watchedServices}
return consul, nil
}
// UpdateTTL wraps the Consul.Agent's UpdateTTL method, and is used to set a TTL
// check to the passing state
func (c *Consul) UpdateTTL(checkID, output, status string) error {
return c.Agent().UpdateTTL(checkID, output, status)
}
// CheckRegister wraps the Consul.Agent's CheckRegister method,
// is used to register a new service with the local agent
func (c *Consul) CheckRegister(check *api.AgentCheckRegistration) error {
return c.Agent().CheckRegister(check)
}
// ServiceRegister wraps the Consul.Agent's ServiceRegister method,
// is used to register a new service with the local agent
func (c *Consul) ServiceRegister(service *api.AgentServiceRegistration) error {
return c.Agent().ServiceRegister(service)
}
// ServiceDeregister wraps the Consul.Agent's ServiceDeregister method,
// and is used to deregister a service from the local agent
func (c *Consul) ServiceDeregister(serviceID string) error {
return c.Agent().ServiceDeregister(serviceID)
}
// CheckForUpstreamChanges requests the set of healthy instances of a
// service from Consul and checks whether there has been a change since
// the last check.
func (c *Consul) CheckForUpstreamChanges(backendName, backendTag, dc string) (didChange, isHealthy bool) {
opts := &api.QueryOptions{Datacenter: dc}
instances, meta, err := c.Health().Service(backendName, backendTag, true, opts)
if err != nil {
log.Warnf("failed to query %v: %s [%v]", backendName, err, meta)
return false, false
}
collector.WithLabelValues(backendName).Set(float64(len(instances)))
isHealthy = len(instances) > 0
didChange = c.compareAndSwap(backendName, instances)
return didChange, isHealthy
}
// returns true if any addresses for the service changed and updates
// the internal state
func (c *Consul) compareAndSwap(service string, new []*api.ServiceEntry) bool {
c.lock.Lock()
defer c.lock.Unlock()
existing := c.watchedServices[service]
c.watchedServices[service] = new
return compareForChange(existing, new)
}
// Compare the two arrays to see if the address or port has changed
// or if we've added or removed entries.
func compareForChange(existing, newEntries []*api.ServiceEntry) (changed bool) {
if len(existing) != len(newEntries) {
return true
}
sort.Sort(ByServiceID(existing))
sort.Sort(ByServiceID(newEntries))
for i, ex := range existing {
if ex.Service.Address != newEntries[i].Service.Address ||
ex.Service.Port != newEntries[i].Service.Port {
return true
}
}
return false
}
// ByServiceID implements the Sort interface because Go can't sort without it.
type ByServiceID []*api.ServiceEntry
func (se ByServiceID) Len() int { return len(se) }
func (se ByServiceID) Swap(i, j int) { se[i], se[j] = se[j], se[i] }
func (se ByServiceID) Less(i, j int) bool { return se[i].Service.ID < se[j].Service.ID }