-
Notifications
You must be signed in to change notification settings - Fork 13
/
consul.go
196 lines (173 loc) · 5.99 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package prometheus
import (
"bytes"
"context"
"fmt"
"net/url"
"strings"
"text/template"
"time"
"github.com/circonus-labs/circonus-unified-agent/config"
"github.com/hashicorp/consul/api"
)
// Address of the Consul agent. The address must contain a hostname or an IP address
// and optionally a port (format: "host:port").
type ConsulConfig struct {
Agent string `toml:"agent"`
Queries []*ConsulQuery `toml:"query"`
QueryInterval config.Duration `toml:"query_interval"`
Enabled bool `toml:"enabled"`
}
// One Consul service discovery query
type ConsulQuery struct {
ServiceExtraTags map[string]string `toml:"tags"` // Extra tags to add to metrics found in Consul
serviceURLTemplate *template.Template
serviceExtraTagsTemplate map[string]*template.Template
ServiceName string `toml:"name"` // A name of the searched services (not ID)
ServiceTag string `toml:"tag"` // A tag of the searched services
ServiceDc string `toml:"dc"` // A DC of the searched services
ServiceURL string `toml:"url"` // A template URL of the Prometheus gathering interface. The hostname part of the URL will be replaced by discovered address and port.
lastQueryFailed bool // Store last error status and change log level depending on repeated occurrence
}
func (p *Prometheus) startConsul(ctx context.Context) error {
consulAPIConfig := api.DefaultConfig()
if p.ConsulConfig.Agent != "" {
consulAPIConfig.Address = p.ConsulConfig.Agent
}
consul, err := api.NewClient(consulAPIConfig)
if err != nil {
return fmt.Errorf("cannot connect to the Consul agent: %w", err)
}
// Parse the template for metrics URL, drop queries with template parse errors
i := 0
for _, q := range p.ConsulConfig.Queries {
serviceURLTemplate, err := template.New("URL").Parse(q.ServiceURL)
if err != nil {
p.Log.Errorf("Could not parse the Consul query URL template (%s), skipping it. Error: %s", q.ServiceURL, err)
continue
}
q.serviceURLTemplate = serviceURLTemplate
// Allow to use join function in tags
templateFunctions := template.FuncMap{"join": strings.Join}
// Parse the tag value templates
q.serviceExtraTagsTemplate = make(map[string]*template.Template)
for tagName, tagTemplateString := range q.ServiceExtraTags {
tagTemplate, err := template.New(tagName).Funcs(templateFunctions).Parse(tagTemplateString)
if err != nil {
p.Log.Errorf("Could not parse the Consul query Extra Tag template (%s), skipping it. Error: %s", tagTemplateString, err)
continue
}
q.serviceExtraTagsTemplate[tagName] = tagTemplate
}
p.ConsulConfig.Queries[i] = q
i++
}
// Prevent memory leak by erasing truncated values
for j := i; j < len(p.ConsulConfig.Queries); j++ {
p.ConsulConfig.Queries[j] = nil
}
p.ConsulConfig.Queries = p.ConsulConfig.Queries[:i]
catalog := consul.Catalog()
p.wg.Add(1)
go func() {
// Store last error status and change log level depending on repeated occurrence
var refreshFailed = false
defer p.wg.Done()
err := p.refreshConsulServices(catalog)
if err != nil {
refreshFailed = true
p.Log.Errorf("Unable to refreh Consul services: %v", err)
}
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(p.ConsulConfig.QueryInterval)):
err := p.refreshConsulServices(catalog)
if err != nil {
message := fmt.Sprintf("Unable to refreh Consul services: %v", err)
if refreshFailed {
p.Log.Debug(message)
} else {
p.Log.Warn(message)
}
refreshFailed = true
} else if refreshFailed {
refreshFailed = false
p.Log.Info("Successfully refreshed Consul services after previous errors")
}
}
}
}()
return nil
}
func (p *Prometheus) refreshConsulServices(c *api.Catalog) error {
consulServiceURLs := make(map[string]URLAndAddress)
p.Log.Debugf("Refreshing Consul services")
for _, q := range p.ConsulConfig.Queries {
queryOptions := api.QueryOptions{}
if q.ServiceDc != "" {
queryOptions.Datacenter = q.ServiceDc
}
// Request services from Consul
consulServices, _, err := c.Service(q.ServiceName, q.ServiceTag, &queryOptions)
if err != nil {
return err
}
if len(consulServices) == 0 {
p.Log.Debugf("Queried Consul for Service (%s, %s) but did not find any instances", q.ServiceName, q.ServiceTag)
continue
}
p.Log.Debugf("Queried Consul for Service (%s, %s) and found %d instances", q.ServiceName, q.ServiceTag, len(consulServices))
for _, consulService := range consulServices {
uaa, err := p.getConsulServiceURL(q, consulService)
if err != nil {
message := fmt.Sprintf("Unable to get scrape URLs from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, err)
if q.lastQueryFailed {
p.Log.Debug(message)
} else {
p.Log.Warn(message)
}
q.lastQueryFailed = true
break
}
if q.lastQueryFailed {
p.Log.Infof("Created scrape URLs from Consul for Service (%s, %s)", q.ServiceName, q.ServiceTag)
}
q.lastQueryFailed = false
p.Log.Debugf("Adding scrape URL from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, uaa.URL.String())
consulServiceURLs[uaa.URL.String()] = *uaa
}
}
p.lock.Lock()
p.consulServices = consulServiceURLs
p.lock.Unlock()
return nil
}
func (p *Prometheus) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService) (*URLAndAddress, error) {
var buffer bytes.Buffer
buffer.Reset()
err := q.serviceURLTemplate.Execute(&buffer, s)
if err != nil {
return nil, err
}
serviceURL, err := url.Parse(buffer.String())
if err != nil {
return nil, err
}
extraTags := make(map[string]string)
for tagName, tagTemplate := range q.serviceExtraTagsTemplate {
buffer.Reset()
err = tagTemplate.Execute(&buffer, s)
if err != nil {
return nil, err
}
extraTags[tagName] = buffer.String()
}
p.Log.Debugf("Will scrape metrics from Consul Service %s", serviceURL.String())
return &URLAndAddress{
URL: serviceURL,
OriginalURL: serviceURL,
Tags: extraTags,
}, nil
}